Browse Source

Internal: allow InternalEngine to be stopped and started

Once the current engine is started you can only close it once. Once closed the engine cannot be started again. This commit adds a stop method which signals the engine to free it's resources but in a way that allows restarting.

This is done by introducing InternalEngineHolder which is a wrapper around InternalEngine. This allows to add the stop() method without adding complexity the engine implementation. InternalEngineHolder also serves an entry point for listeners (incoming and outgoing) to other ES components, which removes the needs add/remove them if the engine is stopped.

Closes #8784
Boaz Leskes 11 years ago
parent
commit
83bb65a020
20 changed files with 869 additions and 425 deletions
  1. 4 6
      src/main/java/org/elasticsearch/common/lucene/LoggerInfoStream.java
  2. 7 2
      src/main/java/org/elasticsearch/index/engine/Engine.java
  3. 109 196
      src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java
  4. 438 0
      src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java
  5. 1 1
      src/main/java/org/elasticsearch/index/engine/internal/InternalEngineModule.java
  6. 4 0
      src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java
  7. 7 8
      src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java
  8. 2 2
      src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java
  9. 2 2
      src/test/java/org/elasticsearch/index/engine/internal/InternalEngineIntegrationTest.java
  10. 2 2
      src/test/java/org/elasticsearch/index/engine/internal/InternalEngineSettingsTest.java
  11. 42 32
      src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java
  12. 34 39
      src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
  13. 13 13
      src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java
  14. 14 15
      src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java
  15. 33 36
      src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java
  16. 2 1
      src/test/java/org/elasticsearch/test/ElasticsearchSingleNodeTest.java
  17. 1 1
      src/test/java/org/elasticsearch/test/engine/MockEngineModule.java
  18. 24 59
      src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java
  19. 119 0
      src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java
  20. 11 10
      src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java

+ 4 - 6
src/main/java/org/elasticsearch/common/lucene/LoggerInfoStream.java

@@ -22,8 +22,6 @@ package org.elasticsearch.common.lucene;
 import org.apache.lucene.util.InfoStream;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.shard.ShardId;
 
 /** An InfoStream (for Lucene's IndexWriter) that redirects
  *  messages to "lucene.iw.ifd" and "lucene.iw" Logger.trace. */
@@ -37,15 +35,15 @@ public final class LoggerInfoStream extends InfoStream {
     /** Logger for IndexFileDeleter */
     private final ESLogger ifdLogger;
 
-    public LoggerInfoStream(Settings settings, ShardId shardId) {
-        logger = Loggers.getLogger("lucene.iw", settings, shardId);
-        ifdLogger = Loggers.getLogger("lucene.iw.ifd", settings, shardId);
+    public LoggerInfoStream(ESLogger parentLogger) {
+        logger = Loggers.getLogger(parentLogger, ".lucene.iw");
+        ifdLogger = Loggers.getLogger(parentLogger, ".lucene.iw.ifd");
     }
 
     public void message(String component, String message) {
         getLogger(component).trace("{} {}: {}", Thread.currentThread().getName(), component, message);
     }
-  
+
     public boolean isEnabled(String component) {
         // TP is a special "test point" component; we don't want
         // to log it:

+ 7 - 2
src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -39,7 +39,6 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.ParseContext.Document;
 import org.elasticsearch.index.mapper.ParsedDocument;
-import org.elasticsearch.index.shard.IndexShardComponent;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 
@@ -48,11 +47,13 @@ import java.util.List;
 /**
  *
  */
-public interface Engine extends IndexShardComponent, CloseableComponent {
+public interface Engine extends CloseableComponent {
 
     static final String INDEX_CODEC = "index.codec";
     static ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");
 
+    ShardId shardId();
+
     /**
      * The default suggested refresh interval, -1 to disable it.
      */
@@ -72,6 +73,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
      */
     void start() throws EngineException;
 
+    /** Stops the engine but allow to re-start it */
+    void stop() throws EngineException;
+
     void create(Create create) throws EngineException;
 
     void index(Index index) throws EngineException;
@@ -496,6 +500,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
 
     static final class Create extends IndexingOperation {
         private final boolean autoGeneratedId;
+
         public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates, boolean autoGeneratedId) {
             super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
             this.autoGeneratedId = autoGeneratedId;

+ 109 - 196
src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

@@ -22,11 +22,7 @@ package org.elasticsearch.index.engine.internal;
 import com.google.common.collect.Lists;
 import org.apache.lucene.index.*;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
-import org.apache.lucene.search.FilteredQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.*;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.util.BytesRef;
@@ -37,7 +33,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Preconditions;
-import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.ESLogger;
@@ -46,36 +41,15 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.math.MathUtils;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
-import org.elasticsearch.index.engine.CreateFailedEngineException;
-import org.elasticsearch.index.engine.DeleteByQueryFailedEngineException;
-import org.elasticsearch.index.engine.DeleteFailedEngineException;
-import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.engine.EngineAlreadyStartedException;
-import org.elasticsearch.index.engine.EngineClosedException;
-import org.elasticsearch.index.engine.EngineCreationFailureException;
-import org.elasticsearch.index.engine.EngineException;
-import org.elasticsearch.index.engine.FlushFailedEngineException;
-import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
-import org.elasticsearch.index.engine.IndexFailedEngineException;
-import org.elasticsearch.index.engine.OptimizeFailedEngineException;
-import org.elasticsearch.index.engine.RecoveryEngineException;
-import org.elasticsearch.index.engine.RefreshFailedEngineException;
-import org.elasticsearch.index.engine.Segment;
-import org.elasticsearch.index.engine.SegmentsStats;
-import org.elasticsearch.index.engine.SnapshotFailedEngineException;
-import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.index.engine.*;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.merge.OnGoingMerge;
@@ -83,9 +57,6 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
 import org.elasticsearch.index.merge.policy.MergePolicyProvider;
 import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
 import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.settings.IndexSettingsService;
-import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.store.Store;
@@ -95,14 +66,7 @@ import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -115,14 +79,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  *
  */
-public class InternalEngine extends AbstractIndexShardComponent implements Engine {
+public class InternalEngine implements Engine {
+
+    protected final ESLogger logger;
+    protected final ShardId shardId;
 
     private volatile boolean failEngineOnCorruption;
     private volatile ByteSizeValue indexingBufferSize;
     private volatile int indexConcurrency;
-    private volatile boolean compoundOnFlush = true;
-
-    private long gcDeletesInMillis;
+    private volatile boolean compoundOnFlush;
+    private volatile long gcDeletesInMillis;
 
     /** When we last pruned expired tombstones from versionMap.deletes: */
     private volatile long lastDeleteVersionPruneTimeMSec;
@@ -134,7 +100,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
     private final ThreadPool threadPool;
 
     private final ShardIndexingService indexingService;
-    private final IndexSettingsService indexSettingsService;
     @Nullable
     private final InternalIndicesWarmer warmer;
     private final Store store;
@@ -181,38 +146,38 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
 
     private final Object refreshMutex = new Object();
 
-    private final ApplySettings applySettings = new ApplySettings();
-
-    private volatile boolean failOnMergeFailure;
     private Throwable failedEngine = null;
     private final Lock failEngineLock = new ReentrantLock();
-    private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();
+    private final FailedEngineListener failedEngineListener;
 
     private final AtomicLong translogIdGenerator = new AtomicLong();
     private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
 
     private SegmentInfos lastCommittedSegmentInfos;
 
-    private IndexThrottle throttle;
+    private final IndexThrottle throttle;
 
-    @Inject
-    public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
-                          IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
+    public InternalEngine(ShardId shardId, ESLogger logger, CodecService codecService, ThreadPool threadPool,
+                          ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
                           Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
                           MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
-                          AnalysisService analysisService, SimilarityService similarityService, CodecService codecService) throws EngineException {
-        super(shardId, indexSettings);
+                          AnalysisService analysisService, SimilarityService similarityService,
+                          boolean enableGcDeletes, long gcDeletesInMillis, ByteSizeValue indexingBufferSize, String codecName,
+                          boolean compoundOnFlush, int indexConcurrency, boolean optimizeAutoGenerateId, boolean failEngineOnCorruption,
+                          FailedEngineListener failedEngineListener) throws EngineException {
         Preconditions.checkNotNull(store, "Store must be provided to the engine");
         Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
         Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
 
-        this.gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueSeconds(60)).millis();
-        this.indexingBufferSize = componentSettings.getAsBytesSize("index_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB)); // not really important, as it is set by the IndexingMemory manager
-        this.codecName = indexSettings.get(INDEX_CODEC, "default");
+        this.shardId = shardId;
+        this.logger = logger;
+        this.gcDeletesInMillis = gcDeletesInMillis;
+        this.enableGcDeletes = enableGcDeletes;
+        this.indexingBufferSize = indexingBufferSize;
+        this.codecName = codecName;
 
         this.threadPool = threadPool;
         this.lastDeleteVersionPruneTimeMSec = threadPool.estimatedTimeInMillis();
-        this.indexSettingsService = indexSettingsService;
         this.indexingService = indexingService;
         this.warmer = (InternalIndicesWarmer) warmer;
         this.store = store;
@@ -223,22 +188,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         this.analysisService = analysisService;
         this.similarityService = similarityService;
         this.codecService = codecService;
-        this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
-        this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
+        this.compoundOnFlush = compoundOnFlush;
+        this.indexConcurrency = indexConcurrency;
         this.versionMap = new LiveVersionMap();
         this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
         for (int i = 0; i < dirtyLocks.length; i++) {
             dirtyLocks[i] = new Object();
         }
-        this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
-
-        this.indexSettingsService.addListener(applySettings);
-        this.failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, true);
-        this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
-        if (failOnMergeFailure) {
-            this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
-        }
+        this.optimizeAutoGenerateId = optimizeAutoGenerateId;
+        this.failEngineOnCorruption = failEngineOnCorruption;
+        this.failedEngineListener = failedEngineListener;
+        // will be decremented in close()
         store.incRef();
+
+        throttle = new IndexThrottle();
     }
 
     @Override
@@ -273,7 +236,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
 
     @Override
     public void addFailedEngineListener(FailedEngineListener listener) {
-        failedEngineListeners.add(listener);
+        throw new UnsupportedOperationException("addFailedEngineListener is not supported by InternalEngineImpl. Use InternalEngine.");
     }
 
     @Override
@@ -287,14 +250,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             if (closed) {
                 throw new EngineClosedException(shardId);
             }
+
             if (logger.isDebugEnabled()) {
                 logger.debug("starting engine");
             }
             try {
                 this.indexWriter = createWriter();
-                mergeScheduler.removeListener(this.throttle);
-                this.throttle = new IndexThrottle(mergeScheduler, logger, indexingService);
-                mergeScheduler.addListener(throttle);
             } catch (IOException e) {
                 maybeFailEngine(e, "start");
                 if (this.indexWriter != null) {
@@ -303,7 +264,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                         indexWriter = null;
                         pending.rollback();
                     } catch (IOException e1) {
-                       e.addSuppressed(e1);
+                        e.addSuppressed(e1);
                     }
                 }
                 throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -343,13 +304,23 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         }
     }
 
+    @Override
+    public void stop() throws EngineException {
+        throw new UnsupportedOperationException("stop() is not supported by InternalEngineImpl. Use InternalEngine.");
+    }
+
     private void readLastCommittedSegmentsInfo() throws IOException {
         lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
     }
 
+    @Override
+    public ShardId shardId() {
+        return shardId;
+    }
+
     @Override
     public TimeValue defaultRefreshInterval() {
-        return new TimeValue(1, TimeUnit.SECONDS);
+        return InternalEngineHolder.DEFAULT_REFRESH_ITERVAL;
     }
 
     /** return the current indexing buffer size setting * */
@@ -362,6 +333,46 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         this.enableGcDeletes = enableGcDeletes;
     }
 
+    public void updateSettings(final long gcDeletesInMillis, final boolean compoundOnFlush, boolean failEngineOnCorruption, final int indexConcurrency, final String codecName) {
+        ensureOpen();
+        if (this.gcDeletesInMillis != gcDeletesInMillis) {
+            logger.trace("[impl] updating gcDeletesInMillis from [{}] to [{}]", this.gcDeletesInMillis, gcDeletesInMillis);
+            this.gcDeletesInMillis = gcDeletesInMillis;
+        }
+        if (this.compoundOnFlush != compoundOnFlush) {
+            this.compoundOnFlush = compoundOnFlush;
+            logger.trace("[impl] updating compoundOnFlush from [{}] to [{}]", this.compoundOnFlush, compoundOnFlush);
+            indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
+        }
+        if (this.failEngineOnCorruption != failEngineOnCorruption) {
+            logger.trace("[impl] updating failEngineOnCorruption from [{}] to [{}]", this.failEngineOnCorruption, failEngineOnCorruption);
+            this.failEngineOnCorruption = failEngineOnCorruption;
+        }
+        if (indexConcurrency != this.indexConcurrency || !codecName.equals(this.codecName)) {
+            boolean requiresFlushing = false;
+            try (InternalLock _ = readLock.acquire()) {
+                if (indexConcurrency != this.indexConcurrency) {
+                    logger.trace("[impl] updating indexConcurrency from [{}] to [{}]", this.indexConcurrency, indexConcurrency);
+                    this.indexConcurrency = indexConcurrency;
+                    // we have to flush in this case, since it only applies on a new index writer
+                    requiresFlushing = true;
+                }
+                if (!codecName.equals(this.codecName)) {
+                    logger.trace("[impl] updating codecName from [{}] to [{}]", this.codecName, codecName);
+                    this.codecName = codecName;
+                    // we want to flush in this case, so the new codec will be reflected right away...
+                    requiresFlushing = true;
+                }
+
+            } finally {
+                if (requiresFlushing) {
+                    flush(new Flush().type(Flush.Type.NEW_WRITER));
+                }
+            }
+        }
+    }
+
+    @Override
     public GetResult get(Get get) throws EngineException {
         try (InternalLock _ = readLock.acquire()) {
             if (get.realtime()) {
@@ -863,10 +874,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                             indexWriter.rollback();
                         }
                         indexWriter = createWriter();
-                        mergeScheduler.removeListener(this.throttle);
-
-                        this.throttle = new IndexThrottle(mergeScheduler, this.logger, indexingService);
-                        mergeScheduler.addListener(throttle);
                         // commit on a just opened writer will commit even if there are no changes done to it
                         // we rely on that for the commit data translog id key
                         if (flushNeeded || flush.force()) {
@@ -1035,7 +1042,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             throw new OptimizeFailedEngineException(shardId, t);
         }
     }
-    
+
     private void waitForMerges(boolean flushAfter) {
         try {
             currentIndexWriter().waitForMerges();
@@ -1063,9 +1070,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 MergePolicy mp = writer.getConfig().getMergePolicy();
                 assert mp instanceof ElasticsearchMergePolicy : "MergePolicy is " + mp.getClass().getName();
                 if (optimize.upgrade()) {
-                    ((ElasticsearchMergePolicy)mp).setUpgradeInProgress(true);
+                    ((ElasticsearchMergePolicy) mp).setUpgradeInProgress(true);
                 }
-                
+
                 if (optimize.onlyExpungeDeletes()) {
                     writer.forceMergeDeletes(false);
                 } else if (optimize.maxNumSegments() <= 0) {
@@ -1081,7 +1088,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 optimizeMutex.set(false);
             }
         }
-        
+
         // wait for the merges outside of the read lock
         if (optimize.waitForMerge()) {
             waitForMerges(optimize.flush());
@@ -1092,6 +1099,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 public void onFailure(Throwable t) {
                     logger.error("Exception while waiting for merges asynchronously after optimize", t);
                 }
+
                 @Override
                 protected void doRun() throws Exception {
                     waitForMerges(true);
@@ -1180,9 +1188,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 failEngine("corrupt file detected source: [" + source + "]", t);
                 return true;
             } else {
-                logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
+                logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
             }
-        }else if (ExceptionsHelper.isOOM(t)) {
+        } else if (ExceptionsHelper.isOOM(t)) {
             failEngine("out of memory", t);
             return true;
         }
@@ -1211,7 +1219,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 }
                 stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
                 stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
-                stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB()*1024*1024));
+                stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB() * 1024 * 1024));
                 return stats;
             }
         }
@@ -1305,9 +1313,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             if (!closed) {
                 try {
                     closed = true;
-                    indexSettingsService.removeListener(applySettings);
                     this.versionMap.clear();
-                    this.failedEngineListeners.clear();
                     logger.debug("close searcherManager");
                     try {
                         IOUtils.close(searcherManager);
@@ -1327,8 +1333,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                 } catch (Throwable e) {
                     logger.warn("failed to rollback writer on close", e);
                 } finally {
-                    indexWriter = null;
                     store.decRef();
+                    indexWriter = null;
                 }
             }
         }
@@ -1339,21 +1345,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         return this.indexWriter.getConfig();
     }
 
-    class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
-        @Override
-        public void onFailedMerge(MergePolicy.MergeException e) {
-            if (Lucene.isCorruptionException(e)) {
-                if (failEngineOnCorruption) {
-                    failEngine("corrupt file detected source: [merge]", e);
-                } else {
-                    logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, INDEX_FAIL_ON_CORRUPTION, failEngineOnCorruption);
-                }
-            } else {
-                failEngine("merge exception", e);
-            }
-        }
-    }
-
     @Override
     public void failEngine(String reason, Throwable failure) {
         assert failure != null;
@@ -1380,9 +1371,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                     logger.warn("failed engine [{}]", failure, reason);
                     // we must set a failure exception, generate one if not supplied
                     failedEngine = failure;
-                    for (FailedEngineListener listener : failedEngineListeners) {
-                        listener.onFailedEngine(shardId, reason, failure);
-                    }
+                    failedEngineListener.onFailedEngine(shardId, reason, failure);
                 } finally {
                     close();
                 }
@@ -1425,7 +1414,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             config.setCommitOnClose(false); // we by default don't commit on close
             config.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
             config.setIndexDeletionPolicy(deletionPolicy);
-            config.setInfoStream(new LoggerInfoStream(indexSettings, shardId));
+            config.setInfoStream(new LoggerInfoStream(logger));
             config.setMergeScheduler(mergeScheduler.newMergeScheduler());
             MergePolicy mergePolicy = mergePolicyProvider.getMergePolicy();
             // Give us the opportunity to upgrade old segments while performing
@@ -1475,63 +1464,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         }
     }
 
-    public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
-    public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
-    public static final String INDEX_GC_DELETES = "index.gc_deletes";
-    public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
-    public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
-
-
-    class ApplySettings implements IndexSettingsService.Listener {
-
-        @Override
-        public void onRefreshSettings(Settings settings) {
-            long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngine.this.gcDeletesInMillis)).millis();
-            if (gcDeletesInMillis != InternalEngine.this.gcDeletesInMillis) {
-                logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngine.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
-                InternalEngine.this.gcDeletesInMillis = gcDeletesInMillis;
-            }
-
-            final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngine.this.compoundOnFlush);
-            if (compoundOnFlush != InternalEngine.this.compoundOnFlush) {
-                logger.info("updating {} from [{}] to [{}]", InternalEngine.INDEX_COMPOUND_ON_FLUSH, InternalEngine.this.compoundOnFlush, compoundOnFlush);
-                InternalEngine.this.compoundOnFlush = compoundOnFlush;
-                indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
-            }
-
-            InternalEngine.this.failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
-            int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
-            boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
-            String codecName = settings.get(INDEX_CODEC, InternalEngine.this.codecName);
-            boolean requiresFlushing = false;
-            if (indexConcurrency != InternalEngine.this.indexConcurrency ||
-                    !codecName.equals(InternalEngine.this.codecName) ||
-                    failOnMergeFailure != InternalEngine.this.failOnMergeFailure) {
-                try (InternalLock _ = readLock.acquire()) {
-                    if (indexConcurrency != InternalEngine.this.indexConcurrency) {
-                        logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency);
-                        InternalEngine.this.indexConcurrency = indexConcurrency;
-                        // we have to flush in this case, since it only applies on a new index writer
-                        requiresFlushing = true;
-                    }
-                    if (!codecName.equals(InternalEngine.this.codecName)) {
-                        logger.info("updating index.codec from [{}] to [{}]", InternalEngine.this.codecName, codecName);
-                        InternalEngine.this.codecName = codecName;
-                        // we want to flush in this case, so the new codec will be reflected right away...
-                        requiresFlushing = true;
-                    }
-                    if (failOnMergeFailure != InternalEngine.this.failOnMergeFailure) {
-                        logger.info("updating {} from [{}] to [{}]", InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure, failOnMergeFailure);
-                        InternalEngine.this.failOnMergeFailure = failOnMergeFailure;
-                    }
-                }
-                if (requiresFlushing) {
-                    flush(new Flush().type(Flush.Type.NEW_WRITER));
-                }
-            }
-        }
-    }
-
     private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException {
         final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter, true), shardId);
         return new SearcherManager(directoryReader, searcherFactory);
@@ -1723,54 +1655,35 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
         }
     }
 
+    public void activateThrottling() {
+        throttle.activate();
+    }
 
+    public void deactivateThrottling() {
+        throttle.deactivate();
+    }
 
-    static final class IndexThrottle implements MergeSchedulerProvider.Listener {
+    static final class IndexThrottle {
 
         private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
         private final InternalLock lockReference = new InternalLock(new ReentrantLock());
-        private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
-        private final AtomicBoolean isThrottling = new AtomicBoolean();
-        private final MergeSchedulerProvider mergeScheduler;
-        private final ESLogger logger;
-        private final ShardIndexingService indexingService;
 
         private volatile InternalLock lock = NOOP_LOCK;
 
-        public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger, ShardIndexingService indexingService) {
-            this.mergeScheduler = mergeScheduler;
-            this.logger = logger;
-            this.indexingService = indexingService;
-        }
 
         public Releasable acquireThrottle() {
             return lock.acquire();
         }
 
-        @Override
-        public synchronized void beforeMerge(OnGoingMerge merge) {
-            int maxNumMerges = mergeScheduler.getMaxMerges();
-            if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
-                if (isThrottling.getAndSet(true) == false) {
-                    logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
-                    indexingService.throttlingActivated();
-                }
-                lock = lockReference;
-            }
+        public void activate() {
+            assert lock == NOOP_LOCK : "throttling activated while already active";
+            lock = lockReference;
         }
 
-        @Override
-        public synchronized void afterMerge(OnGoingMerge merge) {
-            int maxNumMerges = mergeScheduler.getMaxMerges();
-            if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
-                if (isThrottling.getAndSet(false)) {
-                    logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
-                    indexingService.throttlingDeactivated();
-                }
-                lock = NOOP_LOCK;
-            }
+        public void deactivate() {
+            assert lock != NOOP_LOCK : "throttling deactivated but not active";
+            lock = NOOP_LOCK;
         }
-
     }
 
     private static final class NoOpLock implements Lock {

+ 438 - 0
src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java

@@ -0,0 +1,438 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.engine.internal;
+
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergePolicy;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Preconditions;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.index.analysis.AnalysisService;
+import org.elasticsearch.index.codec.CodecService;
+import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
+import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
+import org.elasticsearch.index.engine.*;
+import org.elasticsearch.index.indexing.ShardIndexingService;
+import org.elasticsearch.index.merge.OnGoingMerge;
+import org.elasticsearch.index.merge.policy.MergePolicyProvider;
+import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.settings.IndexSettingsService;
+import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.IndexShardComponent;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.similarity.SimilarityService;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.warmer.IndicesWarmer;
+import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class InternalEngineHolder extends AbstractIndexShardComponent implements IndexShardComponent, Engine, Engine.FailedEngineListener {
+
+
+    private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
+    private final ApplySettings settingsListener;
+    private final MergeScheduleListener mergeSchedulerListener;
+    private volatile Boolean failOnMergeFailure;
+    protected volatile boolean failEngineOnCorruption;
+    protected volatile ByteSizeValue indexingBufferSize;
+    protected volatile int indexConcurrency;
+    protected volatile boolean compoundOnFlush = true;
+
+    protected long gcDeletesInMillis;
+
+    protected volatile boolean enableGcDeletes = true;
+    protected volatile String codecName;
+    protected final boolean optimizeAutoGenerateId;
+
+    protected final ThreadPool threadPool;
+
+    protected final ShardIndexingService indexingService;
+    protected final IndexSettingsService indexSettingsService;
+    @Nullable
+    protected final InternalIndicesWarmer warmer;
+    protected final Store store;
+    protected final SnapshotDeletionPolicy deletionPolicy;
+    protected final Translog translog;
+    protected final MergePolicyProvider mergePolicyProvider;
+    protected final MergeSchedulerProvider mergeScheduler;
+    protected final AnalysisService analysisService;
+    protected final SimilarityService similarityService;
+    protected final CodecService codecService;
+
+    private final AtomicReference<InternalEngine> currentEngine = new AtomicReference<>();
+    private volatile boolean closed = false;
+
+    public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
+    public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
+    public static final String INDEX_GC_DELETES = "index.gc_deletes";
+    public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
+    public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
+
+    public static final TimeValue DEFAULT_REFRESH_ITERVAL = new TimeValue(1, TimeUnit.SECONDS);
+
+    private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();
+
+    @Inject
+    public InternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
+                                IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
+                                Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
+                                MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
+                                AnalysisService analysisService, SimilarityService similarityService, CodecService codecService) throws EngineException {
+        super(shardId, indexSettings);
+        Preconditions.checkNotNull(store, "Store must be provided to the engine");
+        Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
+        Preconditions.checkNotNull(translog, "Translog must be provided to the engine");
+
+        this.gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueSeconds(60)).millis();
+        this.indexingBufferSize = componentSettings.getAsBytesSize("index_buffer_size", new ByteSizeValue(64, ByteSizeUnit.MB)); // not really important, as it is set by the IndexingMemory manager
+        this.codecName = indexSettings.get(INDEX_CODEC, "default");
+
+        this.threadPool = threadPool;
+        this.indexSettingsService = indexSettingsService;
+        this.indexingService = indexingService;
+        this.warmer = (InternalIndicesWarmer) warmer;
+        this.store = store;
+        this.deletionPolicy = deletionPolicy;
+        this.translog = translog;
+        this.mergePolicyProvider = mergePolicyProvider;
+        this.mergeScheduler = mergeScheduler;
+        this.analysisService = analysisService;
+        this.similarityService = similarityService;
+        this.codecService = codecService;
+        this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
+        this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
+        this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
+
+        this.failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, true);
+        this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
+        this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
+        this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
+        this.mergeSchedulerListener = new MergeScheduleListener();
+        this.mergeScheduler.addListener(mergeSchedulerListener);
+
+        this.settingsListener = new ApplySettings();
+        this.indexSettingsService.addListener(this.settingsListener);
+
+    }
+
+    @Override
+    public TimeValue defaultRefreshInterval() {
+        return DEFAULT_REFRESH_ITERVAL;
+    }
+
+
+    public InternalEngine engineSafe() {
+        InternalEngine engine = currentEngine.get();
+        if (engine == null) {
+            throw new EngineClosedException(shardId);
+        }
+        return engine;
+    }
+
+    @Override
+    public void enableGcDeletes(boolean enableGcDeletes) {
+        this.enableGcDeletes = enableGcDeletes;
+        InternalEngine currentEngine = this.currentEngine.get();
+        if (currentEngine != null) {
+            currentEngine.enableGcDeletes(enableGcDeletes);
+        }
+    }
+
+    @Override
+    public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
+        this.indexingBufferSize = indexingBufferSize;
+        InternalEngine currentEngine = this.currentEngine.get();
+        if (currentEngine != null) {
+            currentEngine.updateIndexingBufferSize(indexingBufferSize);
+        }
+    }
+
+    @Override
+    public void addFailedEngineListener(FailedEngineListener listener) {
+        failedEngineListeners.add(listener);
+    }
+
+    @Override
+    public synchronized void start() throws EngineException {
+        if (closed) {
+            throw new EngineClosedException(shardId);
+        }
+        InternalEngine currentEngine = this.currentEngine.get();
+        if (currentEngine != null) {
+            throw new EngineAlreadyStartedException(shardId);
+        }
+        InternalEngine newEngine = createEngineImpl();
+        newEngine.start();
+        boolean success = this.currentEngine.compareAndSet(null, newEngine);
+        assert success : "engine changes should be done under a synchronize";
+    }
+
+    @Override
+    public synchronized void stop() throws EngineException {
+        InternalEngine currentEngine = this.currentEngine.getAndSet(null);
+        if (currentEngine != null) {
+            currentEngine.close();
+        }
+    }
+
+    @Override
+    public synchronized void close() throws ElasticsearchException {
+        closed = true;
+        InternalEngine currentEngine = this.currentEngine.getAndSet(null);
+        if (currentEngine != null) {
+            currentEngine.close();
+        }
+        mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
+        mergeScheduler.removeListener(mergeSchedulerListener);
+        indexSettingsService.removeListener(settingsListener);
+    }
+
+    protected InternalEngine createEngineImpl() {
+        return new InternalEngine(shardId, logger, codecService, threadPool, indexingService,
+                warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
+                enableGcDeletes, gcDeletesInMillis,
+                indexingBufferSize, codecName, compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, this);
+    }
+
+    @Override
+    public void create(Create create) throws EngineException {
+        engineSafe().create(create);
+    }
+
+    @Override
+    public void index(Index index) throws EngineException {
+        engineSafe().index(index);
+    }
+
+    @Override
+    public void delete(Delete delete) throws EngineException {
+        engineSafe().delete(delete);
+    }
+
+    @Override
+    public void delete(DeleteByQuery delete) throws EngineException {
+        engineSafe().delete(delete);
+    }
+
+    @Override
+    public GetResult get(Get get) throws EngineException {
+        return engineSafe().get(get);
+    }
+
+    @Override
+    public Searcher acquireSearcher(String source) throws EngineException {
+        return engineSafe().acquireSearcher(source);
+    }
+
+    @Override
+    public SegmentsStats segmentsStats() {
+        return engineSafe().segmentsStats();
+    }
+
+    @Override
+    public List<Segment> segments() {
+        return engineSafe().segments();
+    }
+
+    @Override
+    public boolean refreshNeeded() {
+        return engineSafe().refreshNeeded();
+    }
+
+    @Override
+    public boolean possibleMergeNeeded() {
+        return engineSafe().possibleMergeNeeded();
+    }
+
+    @Override
+    public void maybeMerge() throws EngineException {
+        engineSafe().maybeMerge();
+    }
+
+    @Override
+    public void refresh(Refresh refresh) throws EngineException {
+        engineSafe().refresh(refresh);
+    }
+
+    @Override
+    public void flush(Flush flush) throws EngineException, FlushNotAllowedEngineException {
+        engineSafe().flush(flush);
+    }
+
+    @Override
+    public void optimize(Optimize optimize) throws EngineException {
+        engineSafe().optimize(optimize);
+    }
+
+    @Override
+    public SnapshotIndexCommit snapshotIndex() throws EngineException {
+        return engineSafe().snapshotIndex();
+    }
+
+    @Override
+    public void recover(RecoveryHandler recoveryHandler) throws EngineException {
+        engineSafe().recover(recoveryHandler);
+    }
+
+    @Override
+    public void failEngine(String reason, Throwable failure) {
+        engineSafe().failEngine(reason, failure);
+    }
+
+    @Override
+    public ShardId shardId() {
+        return shardId;
+    }
+
+    @Override
+    public Settings indexSettings() {
+        return indexSettings;
+    }
+
+
+    /** return the current indexing buffer size setting * */
+    public ByteSizeValue indexingBufferSize() {
+        return indexingBufferSize;
+    }
+
+
+    // called by the current engine
+    @Override
+    public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
+        for (FailedEngineListener listener : failedEngineListeners) {
+            try {
+                listener.onFailedEngine(shardId, reason, failure);
+            } catch (Exception e) {
+                logger.warn("exception while notifying engine failure", e);
+            }
+        }
+    }
+
+    class ApplySettings implements IndexSettingsService.Listener {
+
+        @Override
+        public void onRefreshSettings(Settings settings) {
+            InternalEngine currentEngine = InternalEngineHolder.this.currentEngine.get();
+            boolean change = false;
+            long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis)).millis();
+            if (gcDeletesInMillis != InternalEngineHolder.this.gcDeletesInMillis) {
+                logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
+                InternalEngineHolder.this.gcDeletesInMillis = gcDeletesInMillis;
+                change = true;
+            }
+
+            final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush);
+            if (compoundOnFlush != InternalEngineHolder.this.compoundOnFlush) {
+                logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush, compoundOnFlush);
+                InternalEngineHolder.this.compoundOnFlush = compoundOnFlush;
+                change = true;
+            }
+
+            final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption);
+            if (failEngineOnCorruption != InternalEngineHolder.this.failEngineOnCorruption) {
+                logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption, failEngineOnCorruption);
+                InternalEngineHolder.this.failEngineOnCorruption = failEngineOnCorruption;
+                change = true;
+            }
+            int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngineHolder.this.indexConcurrency);
+            if (indexConcurrency != InternalEngineHolder.this.indexConcurrency) {
+                logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngineHolder.this.indexConcurrency, indexConcurrency);
+                InternalEngineHolder.this.indexConcurrency = indexConcurrency;
+                // we have to flush in this case, since it only applies on a new index writer
+                change = true;
+            }
+            if (!codecName.equals(InternalEngineHolder.this.codecName)) {
+                logger.info("updating index.codec from [{}] to [{}]", InternalEngineHolder.this.codecName, codecName);
+                InternalEngineHolder.this.codecName = codecName;
+                // we want to flush in this case, so the new codec will be reflected right away...
+                change = true;
+            }
+            if (failOnMergeFailure != InternalEngineHolder.this.failOnMergeFailure) {
+                logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, InternalEngineHolder.this.failOnMergeFailure, failOnMergeFailure);
+                InternalEngineHolder.this.failOnMergeFailure = failOnMergeFailure;
+            }
+            if (change && currentEngine != null) {
+                currentEngine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);
+            }
+        }
+    }
+
+    class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
+        @Override
+        public void onFailedMerge(MergePolicy.MergeException e) {
+            if (Lucene.isCorruptionException(e)) {
+                if (failEngineOnCorruption) {
+                    failEngine("corrupt file detected source: [merge]", e);
+                } else {
+                    logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, INDEX_FAIL_ON_CORRUPTION, failEngineOnCorruption);
+                }
+            } else if (failOnMergeFailure) {
+                failEngine("merge exception", e);
+            }
+        }
+    }
+
+    class MergeScheduleListener implements MergeSchedulerProvider.Listener {
+        private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
+        private final AtomicBoolean isThrottling = new AtomicBoolean();
+
+        @Override
+        public synchronized void beforeMerge(OnGoingMerge merge) {
+            int maxNumMerges = mergeScheduler.getMaxMerges();
+            InternalEngine currentEngineImpl = currentEngine.get();
+            if (numMergesInFlight.incrementAndGet() > maxNumMerges && currentEngineImpl != null) {
+                if (isThrottling.getAndSet(true) == false) {
+                    logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
+                    indexingService.throttlingActivated();
+                    currentEngineImpl.activateThrottling();
+                }
+            }
+        }
+
+        @Override
+        public synchronized void afterMerge(OnGoingMerge merge) {
+            int maxNumMerges = mergeScheduler.getMaxMerges();
+            InternalEngine currentEngineImpl = currentEngine.get();
+            if (numMergesInFlight.decrementAndGet() < maxNumMerges && currentEngineImpl != null) {
+                if (isThrottling.getAndSet(false)) {
+                    logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
+                    indexingService.throttlingDeactivated();
+                    currentEngineImpl.deactivateThrottling();
+                }
+            }
+        }
+
+    }
+}

+ 1 - 1
src/main/java/org/elasticsearch/index/engine/internal/InternalEngineModule.java

@@ -29,6 +29,6 @@ public class InternalEngineModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(Engine.class).to(InternalEngine.class).asEagerSingleton();
+        bind(Engine.class).to(InternalEngineHolder.class).asEagerSingleton();
     }
 }

+ 4 - 0
src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java

@@ -76,6 +76,10 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent
         failureListeners.add(listener);
     }
 
+    public void removeFailureListener(FailureListener listener) {
+        failureListeners.remove(listener);
+    }
+
     public void addListener(Listener listener) {
         listeners.add(listener);
     }

+ 7 - 8
src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java

@@ -28,8 +28,7 @@ import org.elasticsearch.cluster.settings.DynamicSettings;
 import org.elasticsearch.cluster.settings.Validator;
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.gateway.local.LocalGatewayAllocator;
-import org.elasticsearch.index.codec.CodecService;
-import org.elasticsearch.index.engine.internal.InternalEngine;
+import org.elasticsearch.index.engine.internal.InternalEngineHolder;
 import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
 import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
 import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
@@ -83,12 +82,12 @@ public class IndexDynamicSettingsModule extends AbstractModule {
         indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
         indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
         indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_INDEX_CONCURRENCY, Validator.NON_NEGATIVE_INTEGER);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_GC_DELETES, Validator.TIME);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CODEC);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE);
-        indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_CORRUPTION);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, Validator.NON_NEGATIVE_INTEGER);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_GC_DELETES, Validator.TIME);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_CODEC);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE);
+        indexDynamicSettings.addDynamicSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION);
         indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
         indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
         indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);

+ 2 - 2
src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java

@@ -21,8 +21,8 @@ package org.elasticsearch.index.snapshots;
 
 import org.elasticsearch.cluster.metadata.SnapshotId;
 import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
-import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.recovery.RecoveryState;
 
 /**
  * Shard-level snapshot repository
@@ -35,7 +35,7 @@ public interface IndexShardRepository {
     /**
      * Creates a snapshot of the shard based on the index commit point.
      * <p/>
-     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.internal.InternalEngine#snapshotIndex()} method.
+     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.internal.InternalEngineHolder#snapshotIndex()} method.
      * IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
      * <p/>
      * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check

+ 2 - 2
src/test/java/org/elasticsearch/index/engine/internal/InternalEngineIntegrationTest.java

@@ -43,13 +43,13 @@ public class InternalEngineIntegrationTest extends ElasticsearchIntegrationTest
         refresh();
         assertTotalCompoundSegments(1, 1, "test");
         client().admin().indices().prepareUpdateSettings("test")
-                .setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, false)).get();
+                .setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false)).get();
         client().prepareIndex("test", "foo").setSource("field", "foo").get();
         refresh();
         assertTotalCompoundSegments(1, 2, "test");
 
         client().admin().indices().prepareUpdateSettings("test")
-                .setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true)).get();
+                .setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true)).get();
         client().prepareIndex("test", "foo").setSource("field", "foo").get();
         refresh();
         assertTotalCompoundSegments(2, 3, "test");

+ 2 - 2
src/test/java/org/elasticsearch/index/engine/internal/InternalEngineSettingsTest.java

@@ -30,9 +30,9 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
         final IndexService service = createIndex("foo");
         // INDEX_COMPOUND_ON_FLUSH
         assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
-        client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
+        client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
         assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(false));
-        client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
+        client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
         assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
     }
 

+ 42 - 32
src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java

@@ -119,9 +119,9 @@ public class InternalEngineTests extends ElasticsearchTestCase {
     public void setUp() throws Exception {
         super.setUp();
         defaultSettings = ImmutableSettings.builder()
-                .put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
-                .put(InternalEngine.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
-                .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
+                .put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
+                .put(InternalEngineHolder.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
+                .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
                 .build(); // TODO randomize more settings
         threadPool = new ThreadPool(getClass().getName());
         store = createStore();
@@ -134,12 +134,20 @@ public class InternalEngineTests extends ElasticsearchTestCase {
             engine.enableGcDeletes(false);
         }
         engine.start();
+        if (randomBoolean()) {
+            engine.stop();
+            engine.start();
+        }
         replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
         replicaEngine = createEngine(replicaSettingsService, storeReplica, createTranslogReplica());
         if (randomBoolean()) {
             replicaEngine.enableGcDeletes(false);
         }
         replicaEngine.start();
+        if (randomBoolean()) {
+            engine.stop();
+            engine.start();
+        }
     }
 
     @After
@@ -176,7 +184,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
             @Override
             public Directory[] build() throws IOException {
-                return new Directory[] {new RAMDirectory() } ;
+                return new Directory[]{new RAMDirectory()};
             }
 
             @Override
@@ -192,7 +200,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
             @Override
             public Directory[] build() throws IOException {
-                return new Directory[] {new RAMDirectory() } ;
+                return new Directory[]{new RAMDirectory()};
             }
 
             @Override
@@ -232,7 +240,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
     }
 
     protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
-        return new InternalEngine(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
+        return new InternalEngineHolder(shardId, defaultSettings, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
                 new AnalysisService(shardId.index(), indexSettingsService.getSettings()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
     }
 
@@ -245,7 +253,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         List<Segment> segments = engine.segments();
         assertThat(segments.isEmpty(), equalTo(true));
         assertThat(engine.segmentsStats().getCount(), equalTo(0l));
-        final boolean defaultCompound = defaultSettings.getAsBoolean(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true);
+        final boolean defaultCompound = defaultSettings.getAsBoolean(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true);
 
         // create a doc and refresh
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
@@ -275,7 +283,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
         assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
 
-        engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, false).build());
+        engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, false).build());
 
         ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
         engine.create(new Engine.Create(null, newUid("3"), doc3));
@@ -318,7 +326,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
         assertThat(segments.get(1).isCompound(), equalTo(false));
 
-        engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
+        engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true).build());
         ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
         engine.create(new Engine.Create(null, newUid("4"), doc4));
         engine.refresh(new Engine.Refresh("test").force(false));
@@ -365,7 +373,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
             }
         };
         thread.start();
-        while(startPending.get()) {
+        while (startPending.get()) {
             try {
                 engine.acquireSearcher("foobar").close();
                 break;
@@ -459,7 +467,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
             assertThat(segment.getMergeId(), nullValue());
         }
         waitForMerge.get().countDown();
-        
+
         if (flush) {
             awaitBusy(new Predicate<Object>() {
                 @Override
@@ -671,21 +679,21 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
         searchResult.close();
     }
-    
+
     @Test
     public void testFailEngineOnCorruption() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
         engine.create(new Engine.Create(null, newUid("1"), doc));
         engine.flush(new Engine.Flush());
-        final boolean failEngine = defaultSettings.getAsBoolean(InternalEngine.INDEX_FAIL_ON_CORRUPTION, false);
-        final int failInPhase = randomIntBetween(1,3);
+        final boolean failEngine = defaultSettings.getAsBoolean(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, false);
+        final int failInPhase = randomIntBetween(1, 3);
         try {
             engine.recover(new Engine.RecoveryHandler() {
                 @Override
                 public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
-                   if (failInPhase == 1) {
-                       throw new RuntimeException("bar", new CorruptIndexException("Foo", "fake file description"));
-                   }
+                    if (failInPhase == 1) {
+                        throw new RuntimeException("bar", new CorruptIndexException("Foo", "fake file description"));
+                    }
                 }
 
                 @Override
@@ -1255,13 +1263,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
 
     private static class MockAppender extends AppenderSkeleton {
         public boolean sawIndexWriterMessage;
+
         public boolean sawIndexWriterIFDMessage;
 
         @Override
         protected void append(LoggingEvent event) {
             if (event.getLevel() == Level.TRACE && event.getMessage().toString().contains("[index][1] ")) {
                 if (event.getLoggerName().endsWith("lucene.iw") &&
-                    event.getMessage().toString().contains("IW: apply all deletes during flush")) {
+                        event.getMessage().toString().contains("IW: apply all deletes during flush")) {
                     sawIndexWriterMessage = true;
                 }
                 if (event.getLoggerName().endsWith("lucene.iw.ifd")) {
@@ -1296,13 +1305,13 @@ public class InternalEngineTests extends ElasticsearchTestCase {
             // First, with DEBUG, which should NOT log IndexWriter output:
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
             engine.create(new Engine.Create(null, newUid("1"), doc));
-            engine.flush(new Engine.Flush());        
+            engine.flush(new Engine.Flush());
             assertFalse(mockAppender.sawIndexWriterMessage);
 
             // Again, with TRACE, which should log IndexWriter output:
             rootLogger.setLevel(Level.TRACE);
             engine.create(new Engine.Create(null, newUid("2"), doc));
-            engine.flush(new Engine.Flush());        
+            engine.flush(new Engine.Flush());
             assertTrue(mockAppender.sawIndexWriterMessage);
 
         } finally {
@@ -1315,7 +1324,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
     public void testIndexWriterIFDInfoStream() {
         MockAppender mockAppender = new MockAppender();
 
-        Logger iwIFDLogger = Logger.getLogger("lucene.iw.ifd");
+        Logger iwIFDLogger = Logger.getLogger("index.engine.internal.lucene.iw.ifd");
         Level savedLevel = iwIFDLogger.getLevel();
         iwIFDLogger.addAppender(mockAppender);
         iwIFDLogger.setLevel(Level.DEBUG);
@@ -1324,14 +1333,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
             // First, with DEBUG, which should NOT log IndexWriter output:
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
             engine.create(new Engine.Create(null, newUid("1"), doc));
-            engine.flush(new Engine.Flush());        
+            engine.flush(new Engine.Flush());
             assertFalse(mockAppender.sawIndexWriterMessage);
             assertFalse(mockAppender.sawIndexWriterIFDMessage);
 
             // Again, with TRACE, which should only log IndexWriter IFD output:
             iwIFDLogger.setLevel(Level.TRACE);
             engine.create(new Engine.Create(null, newUid("2"), doc));
-            engine.flush(new Engine.Flush());        
+            engine.flush(new Engine.Flush());
             assertFalse(mockAppender.sawIndexWriterMessage);
             assertTrue(mockAppender.sawIndexWriterIFDMessage);
 
@@ -1350,15 +1359,16 @@ public class InternalEngineTests extends ElasticsearchTestCase {
 
         // Make sure enableGCDeletes == false works:
         Settings settings = ImmutableSettings.builder()
-                .put(InternalEngine.INDEX_GC_DELETES, "0ms")
+                .put(InternalEngineHolder.INDEX_GC_DELETES, "0ms")
                 .build();
 
-        Engine engine = new InternalEngine(shardId, settings, threadPool,
-                                           engineSettingsService,
-                                           new ShardIndexingService(shardId, settings,
-                                                                    new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, engineSettingsService)),
-                                           null, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy(), createMergeScheduler(engineSettingsService),
-                                           new AnalysisService(shardId.index(), engineSettingsService.getSettings()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
+        Engine engine = new InternalEngineHolder(shardId, settings, threadPool,
+                engineSettingsService,
+                new ShardIndexingService(shardId, settings,
+                        new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, engineSettingsService)),
+                null, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy(), createMergeScheduler(engineSettingsService),
+                new AnalysisService(shardId.index(), engineSettingsService.getSettings()), new SimilarityService(shardId.index()),
+                new CodecService(shardId.index()));
         engine.start();
         engine.enableGcDeletes(false);
 
@@ -1371,7 +1381,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
 
         // Delete document we just added:
         engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
-        
+
         // Get should not find the document
         Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
         assertThat(getResult.exists(), equalTo(false));
@@ -1399,7 +1409,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
         }
 
         // Get should still not find the document
-        getResult = engine.get(new Engine.Get(true, newUid("1")));        
+        getResult = engine.get(new Engine.Get(true, newUid("1")));
         assertThat(getResult.exists(), equalTo(false));
 
         // Try to index uid=2 with a too-old version, should fail:

+ 34 - 39
src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java

@@ -26,10 +26,7 @@ import com.google.common.base.Predicate;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.CheckIndex;
 import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.store.*;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -48,14 +45,12 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDeci
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.index.engine.internal.InternalEngine;
-import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
+import org.elasticsearch.index.engine.internal.InternalEngineHolder;
 import org.elasticsearch.index.merge.policy.MergePolicyModule;
 import org.elasticsearch.index.shard.IndexShardException;
 import org.elasticsearch.index.shard.IndexShardState;
@@ -98,7 +93,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                 // and we need to make sure primaries are not just trashed if we don't have replicas
                 .put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local")
                 .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
-                // speed up recoveries
+                        // speed up recoveries
                 .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, 10)
                 .put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, 10)
                 .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 5)
@@ -124,16 +119,16 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
 
         final boolean failOnCorruption = randomBoolean();
         assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
-                .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
-                .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
-                .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
-                .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
-                .put("indices.recovery.concurrent_streams", 10)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
+                        .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
+                        .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
+                        .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
+                        .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
+                        .put("indices.recovery.concurrent_streams", 10)
         ));
         if (failOnCorruption == false) { // test the dynamic setting
             client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder()
-                    .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)).get();
+                    .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)).get();
         }
         ensureGreen();
         disableAllocation("test");
@@ -234,12 +229,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
         assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
 
         assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
-                .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
-                .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
-                .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
-                .put("indices.recovery.concurrent_streams", 10)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
+                        .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
+                        .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
+                        .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
+                        .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
+                        .put("indices.recovery.concurrent_streams", 10)
         ));
         ensureGreen();
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@@ -277,7 +272,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
         }
         assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
         ClusterState state = client().admin().cluster().prepareState().get().getState();
-        GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[] {"test"}, false);
+        GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
         for (ShardIterator iterator : shardIterators) {
             ShardRouting routing;
             while ((routing = iterator.nextOrNull()) != null) {
@@ -325,13 +320,13 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
 
 
         assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
-                .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
-                // This does corrupt files on the replica, so we can't check:
-                .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
-                .put("index.routing.allocation.include._name", primariesNode.getNode().name())
-                .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
+                        .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
+                                // This does corrupt files on the replica, so we can't check:
+                        .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
+                        .put("index.routing.allocation.include._name", primariesNode.getNode().name())
+                        .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
         ));
         ensureGreen();
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@@ -354,7 +349,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                     if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
                         RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
                         if (truncate && req.length() > 1) {
-                            BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int)req.length()-1);
+                            BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1);
                             request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk());
                         } else {
                             byte[] array = req.content().array();
@@ -409,12 +404,12 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
         assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
 
         assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
-                .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
-                .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
-                .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
-                .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
-                .put("indices.recovery.concurrent_streams", 10)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
+                        .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
+                        .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
+                        .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, true)
+                        .put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, true) // no translog based flush - it might change the .liv / segments.N files
+                        .put("indices.recovery.concurrent_streams", 10)
         ));
         ensureGreen();
         IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@@ -467,7 +462,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
     private ShardRouting corruptRandomFile(final boolean includePerCommitFiles) throws IOException {
         ClusterState state = client().admin().cluster().prepareState().get().getState();
         GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
-        List<ShardIterator>  iterators = Lists.newArrayList(shardIterators);
+        List<ShardIterator> iterators = Lists.newArrayList(shardIterators);
         ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), iterators);
         ShardRouting shardRouting = shardIterator.nextOrNull();
         assertNotNull(shardRouting);
@@ -478,7 +473,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
         Set<File> files = new TreeSet<>(); // treeset makes sure iteration order is deterministic
         for (FsStats.Info info : nodeStatses.getNodes()[0].getFs()) {
             String path = info.getPath();
-            final String relativeDataLocationPath =  "indices/test/" + Integer.toString(shardRouting.getId()) + "/index";
+            final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/index";
             File file = new File(path, relativeDataLocationPath);
             files.addAll(Arrays.asList(file.listFiles(new FileFilter() {
                 @Override
@@ -500,7 +495,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                     checksumBeforeCorruption = CodecUtil.retrieveChecksum(input);
                 }
                 try (RandomAccessFile raf = new RandomAccessFile(fileToCorrupt, "rw")) {
-                    raf.seek(randomIntBetween(0, (int)Math.min(Integer.MAX_VALUE, raf.length()-1)));
+                    raf.seek(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.length() - 1)));
                     long filePointer = raf.getFilePointer();
                     byte b = raf.readByte();
                     raf.seek(filePointer);
@@ -526,7 +521,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
                 msg.append(" file: ").append(fileToCorrupt.getName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getName()));
                 logger.info(msg.toString());
                 assumeTrue("Checksum collision - " + msg.toString(),
-                                checksumAfterCorruption != checksumBeforeCorruption // collision
+                        checksumAfterCorruption != checksumBeforeCorruption // collision
                                 || actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted
             }
         }

+ 13 - 13
src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java

@@ -26,7 +26,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.engine.internal.InternalEngine;
+import org.elasticsearch.index.engine.internal.InternalEngineHolder;
 import org.elasticsearch.index.shard.service.InternalIndexShard;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
@@ -58,15 +58,15 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
         boolean success = awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object input) {
-                return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize &&
-                        ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize;
+                return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize &&
+                        ((InternalEngineHolder) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize;
             }
         });
 
         if (!success) {
             fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" +
-                            ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2  [" +
-                            ((InternalEngine) shard2.engine()).indexingBufferSize().bytes() + "]"
+                            ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "] shard2  [" +
+                            ((InternalEngineHolder) shard2.engine()).indexingBufferSize().bytes() + "]"
             );
         }
 
@@ -74,13 +74,13 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
         success = awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object input) {
-                return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize;
+                return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize;
             }
         });
 
         if (!success) {
             fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" +
-                            ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
+                            ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
             );
         }
 
@@ -99,12 +99,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
         boolean success = awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object input) {
-                return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
+                return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
             }
         });
         if (!success) {
             fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
-                            ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
+                            ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
             );
         }
 
@@ -113,12 +113,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
         success = awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object input) {
-                return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
+                return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
             }
         });
         if (!success) {
             fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
-                            ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
+                            ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
             );
         }
 
@@ -127,12 +127,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
         success = awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object input) {
-                return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
+                return ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
             }
         });
         if (!success) {
             fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
-                            ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
+                            ((InternalEngineHolder) shard1.engine()).indexingBufferSize().bytes() + "]"
             );
         }
     }

+ 14 - 15
src/test/java/org/elasticsearch/indices/memory/breaker/RandomExceptionCircuitBreakerTests.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.indices.memory.breaker;
 
-import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReader;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -37,7 +37,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.elasticsearch.test.engine.MockInternalEngine;
+import org.elasticsearch.test.engine.MockInternalEngineHolder;
 import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
 import org.junit.Test;
 
@@ -74,7 +74,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
                 .endObject() // fielddata
                 .endObject() // test-str
                 .startObject("test-num")
-                // I don't use randomNumericType() here because I don't want "byte", and I want "float" and "double"
+                        // I don't use randomNumericType() here because I don't want "byte", and I want "float" and "double"
                 .field("type", randomFrom(Arrays.asList("float", "long", "double", "short", "integer")))
                 .startObject("fielddata")
                 .field("format", randomNumericFieldDataFormat())
@@ -89,15 +89,15 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
         if (frequently()) {
             if (randomBoolean()) {
                 if (randomBoolean()) {
-                    lowLevelRate =  1.0/between(2, 10);
+                    lowLevelRate = 1.0 / between(2, 10);
                     topLevelRate = 0.0d;
                 } else {
-                    topLevelRate =  1.0/between(2, 10);
+                    topLevelRate = 1.0 / between(2, 10);
                     lowLevelRate = 0.0d;
                 }
             } else {
-                lowLevelRate =  1.0/between(2, 10);
-                topLevelRate =  1.0/between(2, 10);
+                lowLevelRate = 1.0 / between(2, 10);
+                topLevelRate = 1.0 / between(2, 10);
             }
         } else {
             // rarely no exception
@@ -107,10 +107,10 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
 
         ImmutableSettings.Builder settings = settingsBuilder()
                 .put(indexSettings())
-                .put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
+                .put(MockInternalEngineHolder.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
                 .put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
                 .put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
-                .put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
+                .put(MockInternalEngineHolder.WRAP_READER_RATIO, 1.0d);
         logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
         client().admin().indices().prepareCreate("test")
                 .setSettings(settings)
@@ -128,7 +128,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
         } else {
             numDocs = between(10, 100);
         }
-        for (int i = 0; i < numDocs ; i++) {
+        for (int i = 0; i < numDocs; i++) {
             try {
                 client().prepareIndex("test", "type", "" + i)
                         .setTimeout(TimeValue.timeValueSeconds(1)).setSource("test-str", randomUnicodeOfLengthBetween(5, 25), "test-num", i).get();
@@ -150,7 +150,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
 
         for (int i = 0; i < numSearches; i++) {
             SearchRequestBuilder searchRequestBuilder = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery());
-            switch(randomIntBetween(0, 5)) {
+            switch (randomIntBetween(0, 5)) {
                 case 5:
                 case 4:
                 case 3:
@@ -179,7 +179,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
                 ensureGreen("test");  // make sure all shards are there - there could be shards that are still starting up.
                 assertAllSuccessful(client().admin().indices().prepareClearCache("test").setFieldDataCache(true).execute().actionGet());
                 NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
-                    .clear().setBreaker(true).execute().actionGet();
+                        .clear().setBreaker(true).execute().actionGet();
                 for (NodeStats stats : nodeStats.getNodes()) {
                     assertThat("Breaker reset to 0 last search success: " + success + " mapping: " + mapping,
                             stats.getBreaker().getStats(CircuitBreaker.Name.FIELDDATA).getEstimated(), equalTo(0L));
@@ -189,13 +189,13 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
     }
 
 
-
     public static final String EXCEPTION_TOP_LEVEL_RATIO_KEY = "index.engine.exception.ratio.top";
     public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
 
     // TODO: Generalize this class and add it as a utility
-    public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
+    public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngineHolder.DirectoryReaderWrapper {
         private final Settings settings;
+
         static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {
             private final Random random;
             private final double topLevelRatio;
@@ -252,7 +252,6 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
         }
 
 
-
         public RandomExceptionDirectoryReaderWrapper(DirectoryReader in, Settings settings) {
             super(in, new ThrowingSubReaderWrapper(settings));
             this.settings = settings;

+ 33 - 36
src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java

@@ -19,9 +19,8 @@
 
 package org.elasticsearch.search.basic;
 
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.util.English;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -36,9 +35,8 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.elasticsearch.test.engine.MockInternalEngine;
+import org.elasticsearch.test.engine.MockInternalEngineHolder;
 import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.store.MockDirectoryHelper;
 import org.elasticsearch.test.store.MockFSDirectoryService;
 import org.hamcrest.Matchers;
@@ -49,39 +47,37 @@ import java.util.Random;
 import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
 
 public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTest {
-    
+
     @Test
     public void testRandomDirectoryIOExceptions() throws IOException, InterruptedException, ExecutionException {
         String mapping = XContentFactory.jsonBuilder().
                 startObject().
-                    startObject("type").
-                        startObject("properties").
-                            startObject("test")
-                                .field("type", "string")
-                                .field("index", "not_analyzed")
-                            .endObject().
+                startObject("type").
+                startObject("properties").
+                startObject("test")
+                .field("type", "string")
+                .field("index", "not_analyzed")
+                .endObject().
                         endObject().
-                    endObject()
+                        endObject()
                 .endObject().string();
         final double exceptionRate;
         final double exceptionOnOpenRate;
         if (frequently()) {
             if (randomBoolean()) {
                 if (randomBoolean()) {
-                    exceptionOnOpenRate =  1.0/between(5, 100);
+                    exceptionOnOpenRate = 1.0 / between(5, 100);
                     exceptionRate = 0.0d;
                 } else {
-                    exceptionRate =  1.0/between(5, 100);
+                    exceptionRate = 1.0 / between(5, 100);
                     exceptionOnOpenRate = 0.0d;
                 }
             } else {
-                exceptionOnOpenRate =  1.0/between(5, 100);
-                exceptionRate =  1.0/between(5, 100);
+                exceptionOnOpenRate = 1.0 / between(5, 100);
+                exceptionRate = 1.0 / between(5, 100);
             }
         } else {
             // rarely no exception
@@ -101,7 +97,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
                     .addMapping("type", mapping).execute().actionGet();
             numInitialDocs = between(10, 100);
             ensureGreen();
-            for (int i = 0; i < numInitialDocs ; i++) {
+            for (int i = 0; i < numInitialDocs; i++) {
                 client().prepareIndex("test", "type", "init" + i).setSource("test", "init").get();
             }
             client().admin().indices().prepareRefresh("test").execute().get();
@@ -113,10 +109,10 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
             client().admin().indices().prepareOpen("test").execute().get();
         } else {
             Builder settings = settingsBuilder()
-            .put("index.number_of_replicas", randomIntBetween(0, 1))
-            .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
-            .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
-            .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
+                    .put("index.number_of_replicas", randomIntBetween(0, 1))
+                    .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
+                    .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
+                    .put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
             logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
             client().admin().indices().prepareCreate("test")
                     .setSettings(settings)
@@ -140,7 +136,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
         }
         long numCreated = 0;
         boolean[] added = new boolean[numDocs];
-        for (int i = 0; i < numDocs ; i++) {
+        for (int i = 0; i < numDocs; i++) {
             added[i] = false;
             try {
                 IndexResponse indexResponse = client().prepareIndex("test", "type", Integer.toString(i)).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", English.intToEnglish(i)).get();
@@ -161,7 +157,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
         // we don't check anything here really just making sure we don't leave any open files or a broken index behind.
         for (int i = 0; i < numSearches; i++) {
             try {
-                int docToQuery = between(0, numDocs-1);
+                int docToQuery = between(0, numDocs - 1);
                 long expectedResults = added[docToQuery] ? 1 : 0;
                 logger.info("Searching for [test:{}]", English.intToEnglish(docToQuery));
                 SearchResponse searchResponse = client().prepareSearch().setTypes("type").setQuery(QueryBuilders.matchQuery("test", English.intToEnglish(docToQuery))).get();
@@ -216,15 +212,15 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
         if (frequently()) {
             if (randomBoolean()) {
                 if (randomBoolean()) {
-                    lowLevelRate =  1.0/between(2, 10);
+                    lowLevelRate = 1.0 / between(2, 10);
                     topLevelRate = 0.0d;
                 } else {
-                    topLevelRate =  1.0/between(2, 10);
+                    topLevelRate = 1.0 / between(2, 10);
                     lowLevelRate = 0.0d;
                 }
             } else {
-                lowLevelRate =  1.0/between(2, 10);
-                topLevelRate =  1.0/between(2, 10);
+                lowLevelRate = 1.0 / between(2, 10);
+                topLevelRate = 1.0 / between(2, 10);
             }
         } else {
             // rarely no exception
@@ -234,10 +230,10 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
 
         Builder settings = settingsBuilder()
                 .put(indexSettings())
-                .put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
+                .put(MockInternalEngineHolder.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
                 .put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
                 .put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
-                .put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
+                .put(MockInternalEngineHolder.WRAP_READER_RATIO, 1.0d);
         logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
         assertAcked(prepareCreate("test")
                 .setSettings(settings)
@@ -246,7 +242,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
         final int numDocs = between(10, 100);
         long numCreated = 0;
         boolean[] added = new boolean[numDocs];
-        for (int i = 0; i < numDocs ; i++) {
+        for (int i = 0; i < numDocs; i++) {
             try {
                 IndexResponse indexResponse = client().prepareIndex("test", "type", "" + i).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", English.intToEnglish(i)).get();
                 if (indexResponse.isCreated()) {
@@ -266,7 +262,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
         // we don't check anything here really just making sure we don't leave any open files or a broken index behind.
         for (int i = 0; i < numSearches; i++) {
             try {
-                int docToQuery = between(0, numDocs-1);
+                int docToQuery = between(0, numDocs - 1);
                 long expectedResults = added[docToQuery] ? 1 : 0;
                 logger.info("Searching for [test:{}]", English.intToEnglish(docToQuery));
                 SearchResponse searchResponse = client().prepareSearch().setQuery(QueryBuilders.matchQuery("test", English.intToEnglish(docToQuery))).get();
@@ -292,10 +288,11 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
     public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
 
 
-    public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
+    public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngineHolder.DirectoryReaderWrapper {
         private final Settings settings;
+
         static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {
-             private final Random random;
+            private final Random random;
             private final double topLevelRatio;
             private final double lowLevelRatio;
 

+ 2 - 1
src/test/java/org/elasticsearch/test/ElasticsearchSingleNodeTest.java

@@ -36,6 +36,7 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.engine.internal.InternalEngine;
+import org.elasticsearch.index.engine.internal.InternalEngineHolder;
 import org.elasticsearch.index.service.IndexService;
 import org.elasticsearch.index.shard.service.InternalIndexShard;
 import org.elasticsearch.indices.IndicesService;
@@ -193,7 +194,7 @@ public abstract class ElasticsearchSingleNodeTest extends ElasticsearchTestCase
     }
 
     protected static InternalEngine engine(IndexService service) {
-       return ((InternalEngine)((InternalIndexShard)service.shard(0)).engine());
+        return ((InternalEngineHolder) ((InternalIndexShard) service.shard(0)).engine()).engineSafe();
     }
 
     /**

+ 1 - 1
src/test/java/org/elasticsearch/test/engine/MockEngineModule.java

@@ -26,6 +26,6 @@ public class MockEngineModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(Engine.class).to(MockInternalEngine.class).asEagerSingleton();
+        bind(Engine.class).to(MockInternalEngineHolder.class).asEagerSingleton();
     }
 }

+ 24 - 59
src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java

@@ -16,72 +16,56 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.elasticsearch.test.engine;
 
-import org.apache.lucene.index.AssertingDirectoryReader;
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.FilterDirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.AssertingIndexSearcher;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.SearcherManager;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
-import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineException;
 import org.elasticsearch.index.engine.internal.InternalEngine;
 import org.elasticsearch.index.indexing.ShardIndexingService;
 import org.elasticsearch.index.merge.policy.MergePolicyProvider;
 import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.settings.IndexSettingsService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.warmer.IndicesWarmer;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.lang.reflect.Constructor;
-import java.util.Map.Entry;
-import java.util.Random;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-public final class MockInternalEngine extends InternalEngine implements Engine {
+public class MockInternalEngine extends InternalEngine {
+
     public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
-    public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
-    public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
-
-    private final Random random;
-    private final boolean wrapReader;
-    private final Class<? extends FilterDirectoryReader> wrapper;
-
-    @Inject
-    public MockInternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
-                              IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store,
-                              SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
-                              MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
-                              CodecService codecService) throws EngineException {
-        super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store,
-                deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService, codecService);
-        final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
-        random = new Random(seed);
-        final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
-        wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
-        wrapReader = random.nextDouble() < ratio;
-        if (logger.isTraceEnabled()) {
-            logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
-        }
-    }
 
+    private MockInternalEngineHolder.MockContext mockContext;
+
+    public MockInternalEngine(MockInternalEngineHolder.MockContext mockContext, ShardId shardId, ESLogger logger, CodecService codecService,
+                              ThreadPool threadPool, ShardIndexingService indexingService,
+                              @Nullable IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
+                              MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, AnalysisService analysisService,
+                              SimilarityService similarityService, boolean enableGcDeletes, long gcDeletesInMillis, ByteSizeValue indexingBufferSize, String codecName,
+                              boolean compoundOnFlush, int indexConcurrency, boolean optimizeAutoGenerateId, boolean failEngineOnCorruption,
+                              FailedEngineListener failedEngineListener) throws EngineException {
+        super(shardId, logger, codecService, threadPool, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider,
+                mergeScheduler, analysisService, similarityService, enableGcDeletes, gcDeletesInMillis, indexingBufferSize, codecName,
+                compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, failedEngineListener);
+        this.mockContext = mockContext;
+    }
 
     @Override
     public void close() {
@@ -90,7 +74,7 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
         } finally {
             if (logger.isTraceEnabled()) {
                 // log debug if we have pending searchers
-                for (Entry<MockInternalEngine.AssertingSearcher, RuntimeException> entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
+                for (Map.Entry<MockInternalEngine.AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
                     logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId);
                 }
             }
@@ -103,11 +87,11 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
 
         IndexReader reader = searcher.getIndexReader();
         IndexReader wrappedReader = reader;
-        if (reader instanceof DirectoryReader && wrapReader) {
+        if (reader instanceof DirectoryReader && mockContext.wrapReader) {
             wrappedReader = wrapReader((DirectoryReader) reader);
         }
         // this executes basic query checks and asserts that weights are normalized only once etc.
-        final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(random, wrappedReader);
+        final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
         assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
         // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
         // be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
@@ -117,7 +101,7 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
 
     private DirectoryReader wrapReader(DirectoryReader reader) {
         try {
-            Constructor<?>[] constructors = wrapper.getConstructors();
+            Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
             Constructor<?> nonRandom = null;
             for (Constructor<?> constructor : constructors) {
                 Class<?>[] parameterTypes = constructor.getParameterTypes();
@@ -126,7 +110,7 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
                         nonRandom = constructor;
                     } else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) {
 
-                       return (DirectoryReader) constructor.newInstance(reader, indexSettings);
+                        return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings);
                     }
                 }
             }
@@ -205,23 +189,4 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
         }
     }
 
-    public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
-        protected final SubReaderWrapper subReaderWrapper;
-
-        public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) {
-            super(in, subReaderWrapper);
-            this.subReaderWrapper = subReaderWrapper;
-        }
-
-        @Override
-        public Object getCoreCacheKey() {
-            return in.getCoreCacheKey();
-        }
-
-        @Override
-        public Object getCombinedCoreAndDeletesKey() {
-            return in.getCombinedCoreAndDeletesKey();
-        }
-
-    }
 }

+ 119 - 0
src/test/java/org/elasticsearch/test/engine/MockInternalEngineHolder.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test.engine;
+
+import org.apache.lucene.index.AssertingDirectoryReader;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FilterDirectoryReader;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.analysis.AnalysisService;
+import org.elasticsearch.index.codec.CodecService;
+import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.EngineException;
+import org.elasticsearch.index.engine.internal.InternalEngine;
+import org.elasticsearch.index.engine.internal.InternalEngineHolder;
+import org.elasticsearch.index.indexing.ShardIndexingService;
+import org.elasticsearch.index.merge.policy.MergePolicyProvider;
+import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.settings.IndexSettingsService;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.similarity.SimilarityService;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.warmer.IndicesWarmer;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Random;
+
+public final class MockInternalEngineHolder extends InternalEngineHolder implements Engine {
+    public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
+    public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
+
+    public static class MockContext {
+        public final Random random;
+        public final boolean wrapReader;
+        public final Class<? extends FilterDirectoryReader> wrapper;
+        public final Settings indexSettings;
+
+        public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
+            this.random = random;
+            this.wrapReader = wrapReader;
+            this.wrapper = wrapper;
+            this.indexSettings = indexSettings;
+        }
+    }
+
+    MockContext mockContext;
+
+    @Inject
+    public MockInternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
+                                    IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store,
+                                    SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
+                                    MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
+                                    CodecService codecService) throws EngineException {
+        super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService,
+                similarityService, codecService
+        );
+        final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
+        Random random = new Random(seed);
+        final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
+        Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
+        boolean wrapReader = random.nextDouble() < ratio;
+        if (logger.isTraceEnabled()) {
+            logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
+        }
+        mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
+
+    }
+
+    @Override
+    protected InternalEngine createEngineImpl() {
+        return new MockInternalEngine(mockContext, shardId, logger, codecService, threadPool, indexingService,
+                warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService,
+                enableGcDeletes, gcDeletesInMillis,
+                indexingBufferSize, codecName, compoundOnFlush, indexConcurrency, optimizeAutoGenerateId, failEngineOnCorruption, this);
+    }
+
+    public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
+        protected final SubReaderWrapper subReaderWrapper;
+
+        public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) {
+            super(in, subReaderWrapper);
+            this.subReaderWrapper = subReaderWrapper;
+        }
+
+        @Override
+        public Object getCoreCacheKey() {
+            return in.getCoreCacheKey();
+        }
+
+        @Override
+        public Object getCombinedCoreAndDeletesKey() {
+            return in.getCombinedCoreAndDeletesKey();
+        }
+
+    }
+
+}

+ 11 - 10
src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java

@@ -185,6 +185,7 @@ public class ElasticsearchAssertions {
         }
         assertVersionSerializable(countResponse);
     }
+
     public static void assertExists(ExistsResponse existsResponse, boolean expected) {
         if (existsResponse.exists() != expected) {
             fail("Exist is " + existsResponse.exists() + " but " + expected + " was expected " + formatShardStatus(existsResponse));
@@ -261,14 +262,14 @@ public class ElasticsearchAssertions {
                 assertThat(shardSearchFailure.reason(), reasonMatcher);
             }
             assertVersionSerializable(searchResponse);
-        } catch(SearchPhaseExecutionException e) {
+        } catch (SearchPhaseExecutionException e) {
             assertThat(e.status(), equalTo(restStatus));
             assertThat(e.getMessage(), reasonMatcher);
             for (ShardSearchFailure shardSearchFailure : e.shardFailures()) {
                 assertThat(shardSearchFailure.status(), equalTo(restStatus));
                 assertThat(shardSearchFailure.reason(), reasonMatcher);
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
             fail("SearchPhaseExecutionException expected but got " + e.getClass());
         }
     }
@@ -581,10 +582,10 @@ public class ElasticsearchAssertions {
             Streamable newInstance = tryCreateNewInstance(streamable);
             if (newInstance == null) {
                 return; // can't create a new instance - we never modify a
-                        // streamable that comes in.
+                // streamable that comes in.
             }
             if (streamable instanceof ActionRequest) {
-                ((ActionRequest<?>)streamable).validate();
+                ((ActionRequest<?>) streamable).validate();
             }
             BytesReference orig = serialize(version, streamable);
             StreamInput input = new BytesStreamInput(orig);
@@ -691,12 +692,12 @@ public class ElasticsearchAssertions {
     }
 
     public static void assertNodeContainsPlugins(NodesInfoResponse response, String nodeId,
-                                           List<String> expectedJvmPluginNames,
-                                           List<String> expectedJvmPluginDescriptions,
-                                           List<String> expectedJvmVersions,
-                                           List<String> expectedSitePluginNames,
-                                           List<String> expectedSitePluginDescriptions,
-                                           List<String> expectedSiteVersions) {
+                                                 List<String> expectedJvmPluginNames,
+                                                 List<String> expectedJvmPluginDescriptions,
+                                                 List<String> expectedJvmVersions,
+                                                 List<String> expectedSitePluginNames,
+                                                 List<String> expectedSitePluginDescriptions,
+                                                 List<String> expectedSiteVersions) {
 
         Assert.assertThat(response.getNodesMap().get(nodeId), notNullValue());