Browse Source

Merge pull request #14121 from mikemccand/fair_indexing_buffers

The indexing buffer on a node (default: 10% of the JVM heap) is now a "shared pool" across all shards on that node.  This way, shards doing intense indexing can use much more than other shards doing only light indexing, and only once the sum of all indexing buffers across all shards exceeds the node's indexing buffer will we ask shards to move recently indexed documents to segments on disk.
Michael McCandless 9 years ago
parent
commit
b4a095d430
22 changed files with 716 additions and 655 deletions
  1. 0 1
      core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
  2. 0 12
      core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java
  3. 5 2
      core/src/main/java/org/elasticsearch/index/IndexModule.java
  4. 21 13
      core/src/main/java/org/elasticsearch/index/IndexService.java
  5. 1 1
      core/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java
  6. 20 3
      core/src/main/java/org/elasticsearch/index/engine/Engine.java
  7. 6 57
      core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  8. 58 46
      core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  9. 18 2
      core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
  10. 0 8
      core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java
  11. 120 114
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  12. 1 1
      core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
  13. 224 83
      core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java
  14. 1 2
      core/src/main/java/org/elasticsearch/indices/IndicesService.java
  15. 1 0
      core/src/test/java/org/elasticsearch/index/IndexModuleTests.java
  16. 0 46
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java
  17. 5 42
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  18. 4 3
      core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  19. 0 99
      core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java
  20. 225 111
      core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
  21. 5 1
      docs/reference/migration/migrate_3_0.asciidoc
  22. 1 8
      docs/reference/modules/indices/indexing_buffer.asciidoc

+ 0 - 1
core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -152,7 +152,6 @@ public class ClusterModule extends AbstractModule {
         registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY);
         registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
         registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
-        registerIndexDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
         registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
         registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
         registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);

+ 0 - 12
core/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

@@ -125,18 +125,6 @@ final class CompositeIndexEventListener implements IndexEventListener {
         }
     }
 
-    @Override
-    public void onShardActive(IndexShard indexShard) {
-        for (IndexEventListener listener : listeners) {
-            try {
-                listener.onShardActive(indexShard);
-            } catch (Throwable t) {
-                logger.warn("[{}] failed to invoke on shard active callback", t, indexShard.shardId().getId());
-                throw t;
-            }
-        }
-    }
-
     @Override
     public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
         for (IndexEventListener listener : listeners) {

+ 5 - 2
core/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -29,11 +29,13 @@ import org.elasticsearch.index.cache.query.none.NoneQueryCache;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
+import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.similarity.BM25SimilarityProvider;
 import org.elasticsearch.index.similarity.SimilarityProvider;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.store.IndexStore;
 import org.elasticsearch.index.store.IndexStoreConfig;
+import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.cache.query.IndicesQueryCache;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 
@@ -241,7 +243,8 @@ public final class IndexModule {
         IndexSearcherWrapper newWrapper(final IndexService indexService);
     }
 
-    public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry) throws IOException {
+    public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
+                                        IndexingOperationListener... listeners) throws IOException {
         final IndexSettings settings = indexSettings.newWithListener(settingsConsumers);
         IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
         IndexEventListener eventListener = freeze();
@@ -263,6 +266,6 @@ public final class IndexModule {
         final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
         final QueryCache queryCache = queryCacheProvider.apply(settings, servicesProvider.getIndicesQueryCache());
         return new IndexService(settings, environment, new SimilarityService(settings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
-                servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry);
+                servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, listeners);
     }
 }

+ 21 - 13
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -19,6 +19,17 @@
 
 package org.elasticsearch.index;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Query;
@@ -51,6 +62,7 @@ import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.shard.ShadowIndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -64,17 +76,6 @@ import org.elasticsearch.indices.InvalidAliasNameException;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@@ -102,6 +103,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
     private final AtomicBoolean deleted = new AtomicBoolean(false);
     private final IndexSettings indexSettings;
     private final IndexingSlowLog slowLog;
+    private final IndexingOperationListener[] listeners;
 
     public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
                         SimilarityService similarityService,
@@ -113,7 +115,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
                         IndexStore indexStore,
                         IndexEventListener eventListener,
                         IndexModule.IndexSearcherWrapperFactory wrapperFactory,
-                        MapperRegistry mapperRegistry) throws IOException {
+                        MapperRegistry mapperRegistry,
+                        IndexingOperationListener... listenersIn) throws IOException {
         super(indexSettings);
         this.indexSettings = indexSettings;
         this.analysisService = registry.build(indexSettings);
@@ -132,6 +135,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
         // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
         this.searcherWrapper = wrapperFactory.newWrapper(this);
         this.slowLog = new IndexingSlowLog(indexSettings.getSettings());
+
+        // Add our slowLog to the incoming IndexingOperationListeners:
+        this.listeners = new IndexingOperationListener[1+listenersIn.length];
+        this.listeners[0] = slowLog;
+        System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
     }
 
     public int numberOfShards() {
@@ -296,7 +304,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
             if (useShadowEngine(primary, indexSettings)) {
                 indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow  engines don't index
             } else {
-                indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog);
+                indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
             }
 
             eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

+ 1 - 1
core/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java

@@ -183,7 +183,7 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
         boolean isEnabled = getIORateLimitMBPerSec() != Double.POSITIVE_INFINITY;
         if (config.isAutoThrottle() && isEnabled == false) {
             enableAutoIOThrottle();
-        } else if (config.isAutoThrottle() == false && isEnabled){
+        } else if (config.isAutoThrottle() == false && isEnabled) {
             disableAutoIOThrottle();
         }
     }

+ 20 - 3
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -35,6 +35,7 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.search.join.BitSetProducer;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.Accountables;
 import org.elasticsearch.ExceptionsHelper;
@@ -430,8 +431,8 @@ public abstract class Engine implements Closeable {
         stats.addIndexWriterMaxMemoryInBytes(0);
     }
 
-    /** How much heap Lucene's IndexWriter is using */
-    abstract public long indexWriterRAMBytesUsed();
+    /** How much heap is used that would be freed by a refresh.  Note that this may throw {@link AlreadyClosedException}. */
+    abstract public long getIndexBufferRAMBytesUsed();
 
     protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
         ensureOpen();
@@ -529,11 +530,17 @@ public abstract class Engine implements Closeable {
     }
 
     /**
-     * Refreshes the engine for new search operations to reflect the latest
+     * Synchronously refreshes the engine for new search operations to reflect the latest
      * changes.
      */
     public abstract void refresh(String source) throws EngineException;
 
+    /**
+     * Called when our engine is using too much heap and should move buffered indexed/deleted documents to disk.
+     */
+    // NOTE: do NOT rename this to something containing flush or refresh!
+    public abstract void writeIndexingBuffer() throws EngineException;
+
     /**
      * Flushes the state of the engine including the transaction log, clearing memory.
      *
@@ -1142,4 +1149,14 @@ public abstract class Engine implements Closeable {
          */
         void warm(Engine.Searcher searcher, boolean isTopLevelReader);
     }
+
+    /**
+     * Request that this engine throttle incoming indexing requests to one thread.  Must be matched by a later call to {@link deactivateThrottling}.
+     */
+    public abstract void activateThrottling();
+
+    /**
+     * Reverses a previous {@link #activateThrottling} call.
+     */
+    public abstract void deactivateThrottling();
 }

+ 6 - 57
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -26,6 +26,7 @@ import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.similarities.Similarity;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexSettings;
@@ -49,9 +50,7 @@ public final class EngineConfig {
     private final ShardId shardId;
     private final TranslogRecoveryPerformer translogRecoveryPerformer;
     private final IndexSettings indexSettings;
-    private volatile ByteSizeValue indexingBufferSize;
-    private volatile ByteSizeValue versionMapSize;
-    private volatile String versionMapSizeSetting;
+    private final ByteSizeValue indexingBufferSize;
     private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
     private volatile boolean enableGcDeletes = true;
     private final TimeValue flushMergesAfter;
@@ -82,22 +81,12 @@ public final class EngineConfig {
      */
     public static final String INDEX_CODEC_SETTING = "index.codec";
 
-    /**
-     * The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of
-     * the current index memory buffer (defaults to 25%)
-     */
-    public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";
-
-
     /** if set to true the engine will start even if the translog id in the commit point can not be found */
     public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
 
-
     public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
     public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
 
-    public static final String DEFAULT_VERSION_MAP_SIZE = "25%";
-
     private static final String DEFAULT_CODEC_NAME = "default";
     private TranslogConfig translogConfig;
     private boolean create = false;
@@ -124,11 +113,11 @@ public final class EngineConfig {
         this.codecService = codecService;
         this.eventListener = eventListener;
         codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
-        // We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
-        indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
+        // We give IndexWriter a "huge" (256 MB) buffer, so it won't flush on its own unless the ES indexing buffer is also huge and/or
+        // there are not too many shards allocated to this node.  Instead, IndexingMemoryController periodically checks
+        // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
+        indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
         gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
-        versionMapSizeSetting = settings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
-        updateVersionMapSize();
         this.translogRecoveryPerformer = translogRecoveryPerformer;
         this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
         this.queryCache = queryCache;
@@ -137,51 +126,11 @@ public final class EngineConfig {
         this.flushMergesAfter = flushMergesAfter;
     }
 
-    /** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
-    private void updateVersionMapSize() {
-        if (versionMapSizeSetting.endsWith("%")) {
-            double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1));
-            versionMapSize = new ByteSizeValue((long) ((double) indexingBufferSize.bytes() * (percent / 100)));
-        } else {
-            versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting, INDEX_VERSION_MAP_SIZE);
-        }
-    }
-
-    /**
-     * Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
-     */
-    public void setVersionMapSizeSetting(String versionMapSizeSetting) {
-        this.versionMapSizeSetting = versionMapSizeSetting;
-        updateVersionMapSize();
-    }
-
-    /**
-     * current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
-     */
-    public String getVersionMapSizeSetting() {
-        return versionMapSizeSetting;
-    }
-
     /** if true the engine will start even if the translog id in the commit point can not be found */
     public boolean forceNewTranslog() {
         return forceNewTranslog;
     }
 
-    /**
-     * returns the size of the version map that should trigger a refresh
-     */
-    public ByteSizeValue getVersionMapSize() {
-        return versionMapSize;
-    }
-
-    /**
-     * Sets the indexing buffer
-     */
-    public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
-        this.indexingBufferSize = indexingBufferSize;
-        updateVersionMapSize();
-    }
-
     /**
      * Enables / disables gc deletes
      *

+ 58 - 46
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -55,6 +55,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.math.MathUtils;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -118,6 +119,11 @@ public class InternalEngine extends Engine {
 
     private final IndexThrottle throttle;
 
+    // How many callers are currently requesting index throttling.  Currently there are only two situations where we do this: when merges
+    // are falling behind and when writing indexing buffer to disk is too slow.  When this is 0, there is no throttling, else we throttling
+    // incoming indexing ops to a single thread:
+    private final AtomicInteger throttleRequestCount = new AtomicInteger();
+
     public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
         super(engineConfig);
         this.versionMap = new LiveVersionMap();
@@ -306,15 +312,6 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private void updateIndexWriterSettings() {
-        try {
-            final LiveIndexWriterConfig iwc = indexWriter.getConfig();
-            iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac());
-        } catch (AlreadyClosedException ex) {
-            // ignore
-        }
-    }
-
     @Override
     public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
         try (ReleasableLock lock = readLock.acquire()) {
@@ -359,13 +356,12 @@ public class InternalEngine extends Engine {
             maybeFailEngine("index", t);
             throw new IndexFailedEngineException(shardId, index.type(), index.id(), t);
         }
-        checkVersionMapRefresh();
         return created;
     }
 
     private boolean innerIndex(Index index) throws IOException {
         synchronized (dirtyLock(index.uid())) {
-            lastWriteNanos  = index.startTime();
+            lastWriteNanos = index.startTime();
             final long currentVersion;
             final boolean deleted;
             VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
@@ -423,33 +419,6 @@ public class InternalEngine extends Engine {
         }
     }
 
-    /**
-     * Forces a refresh if the versionMap is using too much RAM
-     */
-    private void checkVersionMapRefresh() {
-        if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
-            try {
-                if (isClosed.get()) {
-                    // no point...
-                    return;
-                }
-                // Now refresh to clear versionMap:
-                engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            refresh("version_table_full");
-                        } catch (EngineClosedException ex) {
-                            // ignore
-                        }
-                    }
-                });
-            } catch (EsRejectedExecutionException ex) {
-                // that is fine too.. we might be shutting down
-            }
-        }
-    }
-
     @Override
     public void delete(Delete delete) throws EngineException {
         try (ReleasableLock lock = readLock.acquire()) {
@@ -462,7 +431,6 @@ public class InternalEngine extends Engine {
         }
 
         maybePruneDeletedTombstones();
-        checkVersionMapRefresh();
     }
 
     private void maybePruneDeletedTombstones() {
@@ -547,6 +515,43 @@ public class InternalEngine extends Engine {
         mergeScheduler.refreshConfig();
     }
 
+    @Override
+    public void writeIndexingBuffer() throws EngineException {
+
+        // we obtain a read lock here, since we don't want a flush to happen while we are writing
+        // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
+        try (ReleasableLock lock = readLock.acquire()) {
+            ensureOpen();
+
+            // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
+            // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
+            // refresh API), and another for version map interactions.  See #15768.
+            final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
+            final long indexingBufferBytes = indexWriter.ramBytesUsed();
+
+            final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
+            if (useRefresh) {
+                // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
+                logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
+                             new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
+                refresh("write indexing buffer");
+            } else {
+                // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
+                logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
+                             new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
+                indexWriter.flush();
+            }
+        } catch (AlreadyClosedException e) {
+            ensureOpen();
+            maybeFailEngine("writeIndexingBuffer", e);
+        } catch (EngineClosedException e) {
+            throw e;
+        } catch (Throwable t) {
+            failEngine("writeIndexingBuffer failed", t);
+            throw new RefreshFailedEngineException(shardId, t);
+        }
+    }
+
     @Override
     public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
         // best effort attempt before we acquire locks
@@ -821,8 +826,8 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public long indexWriterRAMBytesUsed() {
-        return indexWriter.ramBytesUsed();
+    public long getIndexBufferRAMBytesUsed() {
+        return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
     }
 
     @Override
@@ -1044,12 +1049,22 @@ public class InternalEngine extends Engine {
         }
     }
 
+    @Override
     public void activateThrottling() {
-        throttle.activate();
+        int count = throttleRequestCount.incrementAndGet();
+        assert count >= 1: "invalid post-increment throttleRequestCount=" + count;
+        if (count == 1) {
+            throttle.activate();
+        }
     }
 
+    @Override
     public void deactivateThrottling() {
-        throttle.deactivate();
+        int count = throttleRequestCount.decrementAndGet();
+        assert count >= 0: "invalid post-decrement throttleRequestCount=" + count;
+        if (count == 0) {
+            throttle.deactivate();
+        }
     }
 
     public long getIndexThrottleTimeInMillis() {
@@ -1162,9 +1177,6 @@ public class InternalEngine extends Engine {
 
     public void onSettingsChanged() {
         mergeScheduler.refreshConfig();
-        updateIndexWriterSettings();
-        // config().getVersionMapSize() may have changed:
-        checkVersionMapRefresh();
         // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
         maybePruneDeletedTombstones();
     }

+ 18 - 2
core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

@@ -227,8 +227,24 @@ public class ShadowEngine extends Engine {
     }
 
     @Override
-    public long indexWriterRAMBytesUsed() {
-        // No IndexWriter
+    public long getIndexBufferRAMBytesUsed() {
+        // No IndexWriter nor version map
+        throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
+    }
+
+    @Override
+    public void writeIndexingBuffer() {
+        // No indexing buffer
+        throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
+    }
+
+    @Override
+    public void activateThrottling() {
+        throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
+    }
+
+    @Override
+    public void deactivateThrottling() {
         throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
     }
 }

+ 0 - 8
core/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

@@ -70,7 +70,6 @@ public interface IndexEventListener {
      */
     default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {}
 
-
     /**
      * Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
      * The order of concurrent events is preserved. The execution must be lightweight.
@@ -89,13 +88,6 @@ public interface IndexEventListener {
      */
     default void onShardInactive(IndexShard indexShard) {}
 
-    /**
-     * Called when a shard is marked as active ie. was previously inactive and is now active again.
-     *
-     * @param indexShard The shard that was marked active
-     */
-    default void onShardActive(IndexShard indexShard) {}
-
     /**
      * Called before the index gets created. Note that this is also called
      * when the index is created on data nodes

+ 120 - 114
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -97,8 +97,8 @@ import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.search.stats.ShardSearchStats;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
-import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.Store.MetadataSnapshot;
+import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.store.StoreStats;
 import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
@@ -133,9 +133,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-
 public class IndexShard extends AbstractIndexShardComponent {
 
     private final ThreadPool threadPool;
@@ -167,6 +167,12 @@ public class IndexShard extends AbstractIndexShardComponent {
     private final IndexEventListener indexEventListener;
     private final IndexSettings idxSettings;
     private final NodeServicesProvider provider;
+
+    /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh.  IndexingMemoryController polls this
+     *  across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
+     *  being indexed/deleted. */
+    private final AtomicLong writingBytes = new AtomicLong();
+
     private TimeValue refreshInterval;
 
     private volatile ScheduledFuture<?> refreshScheduledFuture;
@@ -194,9 +200,7 @@ public class IndexShard extends AbstractIndexShardComponent {
      */
     public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
     public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
-    /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
-    public static final String INDEX_SHARD_INACTIVE_TIME_SETTING = "index.shard.inactive_time";
-    private static final String INDICES_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
+    public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
 
     private final ShardPath path;
 
@@ -205,7 +209,6 @@ public class IndexShard extends AbstractIndexShardComponent {
     private final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
 
     private final IndexSearcherWrapper searcherWrapper;
-    private final TimeValue inactiveTime;
 
     /**
      * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
@@ -219,7 +222,6 @@ public class IndexShard extends AbstractIndexShardComponent {
                       IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
         super(shardId, indexSettings);
         final Settings settings = indexSettings.getSettings();
-        this.inactiveTime = settings.getAsTime(INDEX_SHARD_INACTIVE_TIME_SETTING, settings.getAsTime(INDICES_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)));
         this.idxSettings = indexSettings;
         this.codecService = new CodecService(mapperService, logger);
         this.warmer = provider.getWarmer();
@@ -272,8 +274,6 @@ public class IndexShard extends AbstractIndexShardComponent {
         this.provider = provider;
         this.searcherWrapper = indexSearcherWrapper;
         this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext());
-        // We start up inactive
-        active.set(false);
     }
 
     public Store store() {
@@ -488,7 +488,7 @@ public class IndexShard extends AbstractIndexShardComponent {
      */
     public boolean index(Engine.Index index) {
         ensureWriteAllowed(index);
-        markLastWrite();
+        active.set(true);
         index = indexingOperationListeners.preIndex(index);
         final boolean created;
         try {
@@ -506,7 +506,9 @@ public class IndexShard extends AbstractIndexShardComponent {
             indexingOperationListeners.postIndex(index, ex);
             throw ex;
         }
+
         indexingOperationListeners.postIndex(index);
+
         return created;
     }
 
@@ -528,10 +530,9 @@ public class IndexShard extends AbstractIndexShardComponent {
         return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false);
     }
 
-
     public void delete(Engine.Delete delete) {
         ensureWriteAllowed(delete);
-        markLastWrite();
+        active.set(true);
         delete = indexingOperationListeners.preDelete(delete);
         try {
             if (logger.isTraceEnabled()) {
@@ -548,6 +549,7 @@ public class IndexShard extends AbstractIndexShardComponent {
             indexingOperationListeners.postDelete(delete, ex);
             throw ex;
         }
+
         indexingOperationListeners.postDelete(delete);
     }
 
@@ -556,14 +558,32 @@ public class IndexShard extends AbstractIndexShardComponent {
         return getEngine().get(get, this::acquireSearcher);
     }
 
+    /** Writes all indexing changes to disk and opens a new searcher reflecting all changes.  This can throw {@link EngineClosedException}. */
     public void refresh(String source) {
         verifyNotClosed();
-        if (logger.isTraceEnabled()) {
-            logger.trace("refresh with source: {}", source);
+        if (canIndex()) {
+            long bytes = getEngine().getIndexBufferRAMBytesUsed();
+            writingBytes.addAndGet(bytes);
+            try {
+                logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
+                long time = System.nanoTime();
+                getEngine().refresh(source);
+                refreshMetric.inc(System.nanoTime() - time);
+            } finally {
+                logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
+                writingBytes.addAndGet(-bytes);
+            }
+        } else {
+            logger.debug("refresh with source [{}]", source);
+            long time = System.nanoTime();
+            getEngine().refresh(source);
+            refreshMetric.inc(System.nanoTime() - time);
         }
-        long time = System.nanoTime();
-        getEngine().refresh(source);
-        refreshMetric.inc(System.nanoTime() - time);
+    }
+
+    /** Returns how many bytes we are currently moving from heap to disk */
+    public long getWritingBytes() {
+        return writingBytes.get();
     }
 
     public RefreshStats refreshStats() {
@@ -954,13 +974,6 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
-    private void markLastWrite() {
-        if (active.getAndSet(true) == false) {
-            indexEventListener.onShardActive(this);
-        }
-    }
-
     private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
         Engine.Operation.Origin origin = op.origin();
         IndexShardState state = this.state; // one time volatile read
@@ -1018,85 +1031,34 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
-
-    public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
-        this.shardEventListener.delegates.add(onShardFailure);
-    }
-
-    /**
-     * Change the indexing and translog buffer sizes.  If {@code IndexWriter} is currently using more than
-     * the new buffering indexing size then we do a refresh to free up the heap.
-     */
-    public void updateBufferSize(ByteSizeValue shardIndexingBufferSize) {
-
-        final EngineConfig config = engineConfig;
-        final ByteSizeValue preValue = config.getIndexingBufferSize();
-
-        config.setIndexingBufferSize(shardIndexingBufferSize);
-
+    /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
+    public long getIndexBufferRAMBytesUsed() {
         Engine engine = getEngineOrNull();
         if (engine == null) {
-            logger.debug("updateBufferSize: engine is closed; skipping");
-            return;
+            return 0;
         }
-
-        // update engine if it is already started.
-        if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
-            // so we push changes these changes down to IndexWriter:
-            engine.onSettingsChanged();
-
-            long iwBytesUsed = engine.indexWriterRAMBytesUsed();
-
-            String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes",
-                preValue, shardIndexingBufferSize, iwBytesUsed);
-
-            if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
-                // our allowed buffer was changed to less than we are currently using; we ask IW to refresh
-                // so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
-                logger.debug(message + "; now refresh to clear IndexWriter memory");
-
-                // TODO: should IW have an API to move segments to disk, but not refresh?  Its flush method is protected...
-                try {
-                    refresh("update index buffer");
-                } catch (Throwable e) {
-                    logger.warn("failed to refresh after decreasing index buffer", e);
-                }
-            } else {
-                logger.debug(message);
-            }
+        try {
+            return engine.getIndexBufferRAMBytesUsed();
+        } catch (AlreadyClosedException ex) {
+            return 0;
         }
     }
 
-    /**
-     * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
-     * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so.  This returns true
-     * if the shard is inactive.
-     */
-    public boolean checkIdle() {
-        return checkIdle(inactiveTime.nanos());
+    public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
+        this.shardEventListener.delegates.add(onShardFailure);
     }
 
-    final boolean checkIdle(long inactiveTimeNS) { // pkg private for testing
+    /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
+     *  indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */
+    public void checkIdle(long inactiveTimeNS) {
         Engine engineOrNull = getEngineOrNull();
         if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
             boolean wasActive = active.getAndSet(false);
             if (wasActive) {
-                updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
-                logger.debug("marking shard as inactive (inactive_time=[{}]) indexing wise", inactiveTime);
+                logger.debug("shard is now inactive");
                 indexEventListener.onShardInactive(this);
             }
         }
-
-        return active.get() == false;
-    }
-
-    /**
-     * Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
-     * IndexShard#INDEX_SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}.
-     */
-    public boolean getActive() {
-        return active.get();
     }
 
     public final boolean isFlushOnClose() {
@@ -1194,11 +1156,6 @@ public class IndexShard extends AbstractIndexShardComponent {
                 change = true;
             }
 
-            final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
-            if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
-                config.setVersionMapSizeSetting(versionMapSize);
-            }
-
             final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
             if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
                 logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
@@ -1253,8 +1210,70 @@ public class IndexShard extends AbstractIndexShardComponent {
         return indexEventListener;
     }
 
-    public TimeValue getInactiveTime() {
-        return inactiveTime;
+    public void activateThrottling() {
+        try {
+            getEngine().activateThrottling();
+        } catch (EngineClosedException ex) {
+            // ignore
+        }
+    }
+
+    public void deactivateThrottling() {
+        try {
+            getEngine().deactivateThrottling();
+        } catch (EngineClosedException ex) {
+            // ignore
+        }
+    }
+
+    private void handleRefreshException(Exception e) {
+        if (e instanceof EngineClosedException) {
+            // ignore
+        } else if (e instanceof RefreshFailedEngineException) {
+            RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
+            if (rfee.getCause() instanceof InterruptedException) {
+                // ignore, we are being shutdown
+            } else if (rfee.getCause() instanceof ClosedByInterruptException) {
+                // ignore, we are being shutdown
+            } else if (rfee.getCause() instanceof ThreadInterruptedException) {
+                // ignore, we are being shutdown
+            } else {
+                if (state != IndexShardState.CLOSED) {
+                    logger.warn("Failed to perform engine refresh", e);
+                }
+            }
+        } else {
+            if (state != IndexShardState.CLOSED) {
+                logger.warn("Failed to perform engine refresh", e);
+            }
+        }
+    }
+
+    /**
+     * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
+     */
+    public void writeIndexingBuffer() {
+        if (canIndex() == false) {
+            throw new UnsupportedOperationException();
+        }
+        try {
+            Engine engine = getEngine();
+            long bytes = engine.getIndexBufferRAMBytesUsed();
+
+            // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
+            // memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
+            // there's still up to the 20% being used and continue writing if necessary:
+            logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
+            writingBytes.addAndGet(bytes);
+            try {
+                engine.writeIndexingBuffer();
+            } finally {
+                writingBytes.addAndGet(-bytes);
+                logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
+            }
+        } catch (Exception e) {
+            handleRefreshException(e);
+        };
     }
 
     /**
@@ -1265,7 +1284,7 @@ public class IndexShard extends AbstractIndexShardComponent {
         internalIndexingStats.noopUpdate(type);
     }
 
-    class EngineRefresher implements Runnable {
+    final class EngineRefresher implements Runnable {
         @Override
         public void run() {
             // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule
@@ -1277,27 +1296,13 @@ public class IndexShard extends AbstractIndexShardComponent {
                 @Override
                 public void run() {
                     try {
+                        // TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and
+                        // reschedule if so...
                         if (getEngine().refreshNeeded()) {
                             refresh("schedule");
                         }
-                    } catch (EngineClosedException e) {
-                        // we are being closed, ignore
-                    } catch (RefreshFailedEngineException e) {
-                        if (e.getCause() instanceof InterruptedException) {
-                            // ignore, we are being shutdown
-                        } else if (e.getCause() instanceof ClosedByInterruptException) {
-                            // ignore, we are being shutdown
-                        } else if (e.getCause() instanceof ThreadInterruptedException) {
-                            // ignore, we are being shutdown
-                        } else {
-                            if (state != IndexShardState.CLOSED) {
-                                logger.warn("Failed to perform scheduled engine refresh", e);
-                            }
-                        }
                     } catch (Exception e) {
-                        if (state != IndexShardState.CLOSED) {
-                            logger.warn("Failed to perform scheduled engine refresh", e);
-                        }
+                        handleRefreshException(e);
                     }
 
                     reschedule();
@@ -1493,7 +1498,8 @@ public class IndexShard extends AbstractIndexShardComponent {
         final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
         return new EngineConfig(shardId,
             threadPool, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
-            mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, inactiveTime);
+            mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
+            idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
     }
 
     private static class IndexShardOperationCounter extends AbstractRefCounted {

+ 1 - 1
core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -92,7 +92,7 @@ public class TranslogWriter extends TranslogReader {
             writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
             final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
             return writer;
-        } catch (Throwable throwable){
+        } catch (Throwable throwable) {
             // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
             // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
             IOUtils.closeWhileHandlingException(channel);

+ 224 - 83
core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

@@ -24,23 +24,32 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineClosedException;
 import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
-import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
-public class IndexingMemoryController extends AbstractComponent implements IndexEventListener, Closeable {
+public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
 
     /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
     public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@@ -51,34 +60,30 @@ public class IndexingMemoryController extends AbstractComponent implements Index
     /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
     public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
 
-    /** Sets a floor on the per-shard index buffer size (default: 4 MB). */
-    public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size";
-
-    /** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */
-    public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size";
+    /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
+    public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
 
-    /** Sets a floor on the per-shard translog buffer size (default: 2 KB). */
-    public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size";
+    /** Default value (5 minutes) for indices.memory.shard_inactive_time */
+    public static final TimeValue SHARD_DEFAULT_INACTIVE_TIME = TimeValue.timeValueMinutes(5);
 
-    /** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */
-    public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size";
+    /** How frequently we check indexing memory usage (default: 5 seconds). */
+    public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
 
-    /** How frequently we check shards to find inactive ones (default: 30 seconds). */
-    public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
-
-    /** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */
-    public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
+    private final ThreadPool threadPool;
 
     private final IndicesService indicesService;
 
     private final ByteSizeValue indexingBuffer;
-    private final ByteSizeValue minShardIndexBufferSize;
-    private final ByteSizeValue maxShardIndexBufferSize;
+
+    private final TimeValue inactiveTime;
     private final TimeValue interval;
 
+    /** Contains shards currently being throttled because we can't write segments quickly enough */
+    private final Set<IndexShard> throttled = new HashSet<>();
+
     private final ScheduledFuture scheduler;
 
-    private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
+    private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
             IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
 
     private final ShardsIndicesStatusChecker statusChecker;
@@ -110,21 +115,21 @@ public class IndexingMemoryController extends AbstractComponent implements Index
             indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING);
         }
         this.indexingBuffer = indexingBuffer;
-        this.minShardIndexBufferSize = this.settings.getAsBytesSize(MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(4, ByteSizeUnit.MB));
-        // LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155
-        this.maxShardIndexBufferSize = this.settings.getAsBytesSize(MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(512, ByteSizeUnit.MB));
 
-        // we need to have this relatively small to move a shard from inactive to active fast (enough)
-        this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
+        this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, SHARD_DEFAULT_INACTIVE_TIME);
+        // we need to have this relatively small to free up heap quickly enough
+        this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
 
         this.statusChecker = new ShardsIndicesStatusChecker();
 
-        logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}]",
-                this.indexingBuffer,
-                MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
-                MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
-                SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
+        logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
+                     this.indexingBuffer,
+                     SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
+                     SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
         this.scheduler = scheduleTask(threadPool);
+
+        // Need to save this so we can later launch async "write indexing buffer to disk" on shards:
+        this.threadPool = threadPool;
     }
 
     protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
@@ -150,7 +155,8 @@ public class IndexingMemoryController extends AbstractComponent implements Index
 
         for (IndexService indexService : indicesService) {
             for (IndexShard shard : indexService) {
-                if (shardAvailable(shard)) {
+                // shadow replica doesn't have an indexing buffer
+                if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
                     availableShards.add(shard);
                 }
             }
@@ -158,85 +164,220 @@ public class IndexingMemoryController extends AbstractComponent implements Index
         return availableShards;
     }
 
-    /** returns true if shard exists and is availabe for updates */
-    protected boolean shardAvailable(IndexShard shard) {
-        // shadow replica doesn't have an indexing buffer
-        return shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
+    /** returns how much heap this shard is using for its indexing buffer */
+    protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
+        return shard.getIndexBufferRAMBytesUsed();
     }
 
-    /** set new indexing and translog buffers on this shard.  this may cause the shard to refresh to free up heap. */
-    protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize) {
-        try {
-            shard.updateBufferSize(shardIndexingBufferSize);
-        } catch (EngineClosedException | FlushNotAllowedEngineException e) {
-            // ignore
-        } catch (Exception e) {
-            logger.warn("failed to set shard {} index buffer to [{}]", e, shard.shardId(), shardIndexingBufferSize);
-        }
+    /** returns how many bytes this shard is currently writing to disk */
+    protected long getShardWritingBytes(IndexShard shard) {
+        return shard.getWritingBytes();
+    }
+
+    /** ask this shard to refresh, in the background, to free up heap */
+    protected void writeIndexingBufferAsync(IndexShard shard) {
+        threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
+            @Override
+            public void doRun() {
+                shard.writeIndexingBuffer();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.warn("failed to write indexing buffer for shard [{}]; ignoring", t, shard.shardId());
+            }
+        });
     }
 
-    /** check if any shards active status changed, now. */
+    /** force checker to run now */
     void forceCheck() {
         statusChecker.run();
     }
 
-    class ShardsIndicesStatusChecker implements Runnable {
+    /** called by IndexShard to record that this many bytes were written to translog */
+    public void bytesWritten(int bytes) {
+        statusChecker.bytesWritten(bytes);
+    }
+
+    /** Asks this shard to throttle indexing to one thread */
+    protected void activateThrottling(IndexShard shard) {
+        shard.activateThrottling();
+    }
+
+    /** Asks this shard to stop throttling indexing to one thread */
+    protected void deactivateThrottling(IndexShard shard) {
+        shard.deactivateThrottling();
+    }
+
+    @Override
+    public void postIndex(Engine.Index index) {
+        bytesWritten(index.getTranslogLocation().size);        
+    }
+
+    @Override
+    public void postDelete(Engine.Delete delete) {
+        bytesWritten(delete.getTranslogLocation().size);        
+    }
+
+    private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
+        final long bytesUsed;
+        final IndexShard shard;
+
+        public ShardAndBytesUsed(long bytesUsed, IndexShard shard) {
+            this.bytesUsed = bytesUsed;
+            this.shard = shard;
+        }
+
         @Override
-        public synchronized void run() {
-            List<IndexShard> availableShards = availableShards();
-            List<IndexShard> activeShards = new ArrayList<>();
-            for (IndexShard shard : availableShards) {
-                if (!checkIdle(shard)) {
-                    activeShards.add(shard);
+        public int compareTo(ShardAndBytesUsed other) {
+            // Sort larger shards first:
+            return Long.compare(other.bytesUsed, bytesUsed);
+        }
+    }
+
+    /** not static because we need access to many fields/methods from our containing class (IMC): */
+    final class ShardsIndicesStatusChecker implements Runnable {
+
+        final AtomicLong bytesWrittenSinceCheck = new AtomicLong();
+        final ReentrantLock runLock = new ReentrantLock();
+
+        /** Shard calls this on each indexing/delete op */
+        public void bytesWritten(int bytes) {
+            long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
+            while (totalBytes > indexingBuffer.bytes()/30) {
+                if (runLock.tryLock()) {
+                    try {
+                        bytesWrittenSinceCheck.addAndGet(-totalBytes);
+                        // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
+                        // typically smaller but can be larger in extreme cases (many unique terms).  This logic is here only as a safety against
+                        // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
+                        // processed by indexing:
+                        runUnlocked();
+                    } finally {
+                        runLock.unlock();
+                    }
+                } else {
+                    break;
                 }
             }
-            int activeShardCount = activeShards.size();
+        }
 
-            // TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
-            // is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
-            // get the same indexing buffer as large indices.  But it quickly gets tricky...
-            if (activeShardCount == 0) {
-                return;
+        @Override
+        public void run() {
+            runLock.lock();
+            try {
+                runUnlocked();
+            } finally {
+                runLock.unlock();
             }
+        }
+
+        private void runUnlocked() {
+            // NOTE: even if we hit an errant exc here, our ThreadPool.scheduledWithFixedDelay will log the exception and re-invoke us
+            // again, on schedule
+
+            // First pass to sum up how much heap all shards' indexing buffers are using now, and how many bytes they are currently moving
+            // to disk:
+            long totalBytesUsed = 0;
+            long totalBytesWriting = 0;
+            for (IndexShard shard : availableShards()) {
+
+                // Give shard a chance to transition to inactive so sync'd flush can happen:
+                checkIdle(shard, inactiveTime.nanos());
+
+                // How many bytes this shard is currently (async'd) moving from heap to disk:
+                long shardWritingBytes = getShardWritingBytes(shard);
+
+                // How many heap bytes this shard is currently using
+                long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
+
+                shardBytesUsed -= shardWritingBytes;
+                totalBytesWriting += shardWritingBytes;
 
-            ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShardCount);
-            if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
-                shardIndexingBufferSize = minShardIndexBufferSize;
+                // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could
+                // have a negative value here.  So we just skip this shard since that means it's now using very little heap:
+                if (shardBytesUsed < 0) {
+                    continue;
+                }
+
+                totalBytesUsed += shardBytesUsed;
             }
-            if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
-                shardIndexingBufferSize = maxShardIndexBufferSize;
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]",
+                             new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting));
             }
 
-            logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize);
+            // If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now
+            // throttle the top shards to send back-pressure to ongoing indexing:
+            boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.bytes();
+
+            if (totalBytesUsed > indexingBuffer.bytes()) {
+                // OK we are now over-budget; fill the priority queue and ask largest shard(s) to refresh:
+                PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
+
+                for (IndexShard shard : availableShards()) {
+                    // How many bytes this shard is currently (async'd) moving from heap to disk:
+                    long shardWritingBytes = getShardWritingBytes(shard);
+
+                    // How many heap bytes this shard is currently using
+                    long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
+
+                    // Only count up bytes not already being refreshed:
+                    shardBytesUsed -= shardWritingBytes;
+
+                    // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could
+                    // have a negative value here.  So we just skip this shard since that means it's now using very little heap:
+                    if (shardBytesUsed < 0) {
+                        continue;
+                    }
+
+                    if (shardBytesUsed > 0) {
+                        if (logger.isTraceEnabled()) {
+                            if (shardWritingBytes != 0) {
+                                logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, shardWritingBytes);
+                            } else {
+                                logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed);
+                            }
+                        }
+                        queue.add(new ShardAndBytesUsed(shardBytesUsed, shard));
+                    }
+                }
 
-            for (IndexShard shard : activeShards) {
-                updateShardBuffers(shard, shardIndexingBufferSize);
+                logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}], [{}] shards with non-zero indexing buffer",
+                             new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting), queue.size());
+
+                while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) {
+                    ShardAndBytesUsed largest = queue.poll();
+                    logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed));
+                    writeIndexingBufferAsync(largest.shard);
+                    totalBytesUsed -= largest.bytesUsed;
+                    if (doThrottle && throttled.contains(largest.shard) == false) {
+                        logger.info("now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId());
+                        throttled.add(largest.shard);
+                        activateThrottling(largest.shard);
+                    }
+                }
             }
-        }
-    }
 
-    protected long currentTimeInNanos() {
-        return System.nanoTime();
+            if (doThrottle == false) {
+                for(IndexShard shard : throttled) {
+                    logger.info("stop throttling indexing for shard [{}]", shard.shardId());
+                    deactivateThrottling(shard);
+                }
+                throttled.clear();
+            }
+        }
     }
 
     /**
-     * ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so.
-     * return false if the shard is not idle, otherwise true
+     * ask this shard to check now whether it is inactive, and reduces its indexing buffer if so.
      */
-    protected boolean checkIdle(IndexShard shard) {
+    protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
         try {
-            return shard.checkIdle();
+            shard.checkIdle(inactiveTimeNS);
         } catch (EngineClosedException | FlushNotAllowedEngineException e) {
-            logger.trace("ignore [{}] while marking shard {} as inactive", e.getClass().getSimpleName(), shard.shardId());
-            return true;
+            logger.trace("ignore exception while checking if shard {} is inactive", e, shard.shardId());
         }
     }
-
-    @Override
-    public void onShardActive(IndexShard indexShard) {
-        // At least one shard used to be inactive ie. a new write operation just showed up.
-        // We try to fix the shards indexing buffer immediately. We could do this async instead, but cost should
-        // be low, and it's rare this happens.
-        forceCheck();
-    }
 }

+ 1 - 2
core/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -293,14 +293,13 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
 
         final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
         pluginsService.onIndexModule(indexModule);
-        indexModule.addIndexEventListener(indexingMemoryController);
         for (IndexEventListener listener : builtInListeners) {
             indexModule.addIndexEventListener(listener);
         }
         indexModule.addIndexEventListener(oldShardsStats);
         final IndexEventListener listener = indexModule.freeze();
         listener.beforeIndexCreated(index, idxSettings.getSettings());
-        final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry);
+        final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, mapperRegistry, indexingMemoryController);
         boolean success = false;
         try {
             assert indexService.getIndexEventListener() == listener;

+ 1 - 0
core/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -51,6 +51,7 @@ import org.elasticsearch.index.similarity.SimilarityProvider;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.store.IndexStore;
 import org.elasticsearch.index.store.IndexStoreConfig;
+import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.IndicesWarmer;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;

+ 0 - 46
core/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTests.java

@@ -40,8 +40,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
 
         // VERSION MAP SIZE
         long indexBufferSize = engine.config().getIndexingBufferSize().bytes();
-        long versionMapSize = engine.config().getVersionMapSize().bytes();
-        assertThat(versionMapSize, equalTo((long) (indexBufferSize * 0.25)));
 
         final int iters = between(1, 20);
         for (int i = 0; i < iters; i++) {
@@ -51,14 +49,8 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
             // the full long range here else the assert below fails:
             long gcDeletes = random().nextLong() & (Long.MAX_VALUE >> 11);
 
-            boolean versionMapAsPercent = randomBoolean();
-            double versionMapPercent = randomIntBetween(0, 100);
-            long versionMapSizeInMB = randomIntBetween(10, 20);
-            String versionMapString = versionMapAsPercent ? versionMapPercent + "%" : versionMapSizeInMB + "mb";
-
             Settings build = Settings.builder()
                     .put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
-                    .put(EngineConfig.INDEX_VERSION_MAP_SIZE, versionMapString)
                     .build();
             assertEquals(gcDeletes, build.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, null).millis());
 
@@ -71,12 +63,6 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
             assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
 
             indexBufferSize = engine.config().getIndexingBufferSize().bytes();
-            versionMapSize = engine.config().getVersionMapSize().bytes();
-            if (versionMapAsPercent) {
-                assertThat(versionMapSize, equalTo((long) (indexBufferSize * (versionMapPercent / 100))));
-            } else {
-                assertThat(versionMapSize, equalTo(1024 * 1024 * versionMapSizeInMB));
-            }
         }
 
         Settings settings = Settings.builder()
@@ -101,37 +87,5 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
         client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
         assertEquals(engine.getGcDeletesInMillis(), 1000);
         assertTrue(engine.config().isEnableGcDeletes());
-
-        settings = Settings.builder()
-                .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "sdfasfd")
-                .build();
-        try {
-            client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
-            fail("settings update didn't fail, but should have");
-        } catch (IllegalArgumentException e) {
-            // good
-        }
-
-        settings = Settings.builder()
-                .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "-12%")
-                .build();
-        try {
-            client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
-            fail("settings update didn't fail, but should have");
-        } catch (IllegalArgumentException e) {
-            // good
-        }
-
-        settings = Settings.builder()
-                .put(EngineConfig.INDEX_VERSION_MAP_SIZE, "130%")
-                .build();
-        try {
-            client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
-            fail("settings update didn't fail, but should have");
-        } catch (IllegalArgumentException e) {
-            // good
-        }
     }
-
-
 }

+ 5 - 42
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -95,6 +95,7 @@ import org.elasticsearch.index.store.DirectoryUtils;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
+import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.test.DummyShardLock;
@@ -439,7 +440,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testSegmentsWithMergeFlag() throws Exception {
         try (Store store = createStore();
-             Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
+            Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
             Engine.Index index = new Engine.Index(newUid("1"), doc);
             engine.index(index);
@@ -769,7 +770,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testSyncedFlush() throws IOException {
         try (Store store = createStore();
-             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
+            Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
                      new LogByteSizeMergePolicy()), false)) {
             final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@@ -1026,7 +1027,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testForceMerge() throws IOException {
         try (Store store = createStore();
-             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
+            Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
                      new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
             int numDocs = randomIntBetween(10, 100);
             for (int i = 0; i < numDocs; i++) {
@@ -1465,7 +1466,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testEnableGcDeletes() throws Exception {
         try (Store store = createStore();
-             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
+            Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
             engine.config().setEnableGcDeletes(false);
 
             // Add document
@@ -1586,44 +1587,6 @@ public class InternalEngineTests extends ESTestCase {
         assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
     }
 
-    // #10312
-    public void testDeletesAloneCanTriggerRefresh() throws Exception {
-        try (Store store = createStore();
-             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()),
-                     false)) {
-            engine.config().setIndexingBufferSize(new ByteSizeValue(1, ByteSizeUnit.KB));
-            for (int i = 0; i < 100; i++) {
-                String id = Integer.toString(i);
-                ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);
-                engine.index(new Engine.Index(newUid(id), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
-            }
-
-            // Force merge so we know all merges are done before we start deleting:
-            engine.forceMerge(true, 1, false, false, false);
-
-            Searcher s = engine.acquireSearcher("test");
-            final long version1 = ((DirectoryReader) s.reader()).getVersion();
-            s.close();
-            for (int i = 0; i < 100; i++) {
-                String id = Integer.toString(i);
-                engine.delete(new Engine.Delete("test", id, newUid(id), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
-            }
-
-            // We must assertBusy because refresh due to version map being full is done in background (REFRESH) thread pool:
-            assertBusy(new Runnable() {
-                @Override
-                public void run() {
-                    Searcher s2 = engine.acquireSearcher("test");
-                    long version2 = ((DirectoryReader) s2.reader()).getVersion();
-                    s2.close();
-
-                    // 100 buffered deletes will easily exceed 25% of our 1 KB indexing buffer so it should have forced a refresh:
-                    assertThat(version2, greaterThan(version1));
-                }
-            });
-        }
-    }
-
     public void testMissingTranslog() throws IOException {
         // test that we can force start the engine , even if the translog is missing.
         engine.close();

+ 4 - 3
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -367,12 +367,13 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         client().prepareIndex("test", "test").setSource("{}").get();
         ensureGreen("test");
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
-        Boolean result = indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
-        assertEquals(Boolean.TRUE, result);
+        indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
         assertBusy(() -> {
             IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
             assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
-        });
+            indicesService.indexService("test").getShardOrNull(0).checkIdle(0);
+            }
+        );
         IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
         assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
     }

+ 0 - 99
core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

@@ -1,99 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.indices;
-
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
-import org.elasticsearch.test.ESIntegTestCase;
-
-
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
-public class IndexingMemoryControllerIT extends ESIntegTestCase {
-    private long getIWBufferSize(String indexName) {
-        return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes();
-    }
-
-    public void testIndexBufferPushedToEngine() throws InterruptedException {
-        createNode(Settings.builder().put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "100000h",
-                                          IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb",
-                                          IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
-
-        // Create two active indices, sharing 32 MB indexing buffer:
-        prepareCreate("test3").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
-        prepareCreate("test4").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
-
-        ensureGreen();
-
-        index("test3", "type", "1", "f", 1);
-        index("test4", "type", "1", "f", 1);
-
-        // .. then make sure we really pushed the update (16 MB for each) down to the IndexWriter, even if refresh nor flush occurs:
-        if (awaitBusy(() -> getIWBufferSize("test3") == 16*1024*1024) == false) {
-            fail("failed to update shard indexing buffer size for test3 index to 16 MB; got: " + getIWBufferSize("test3"));
-        }
-        if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) {
-            fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4"));
-        }
-
-        client().admin().indices().prepareDelete("test4").get();
-        if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) {
-            fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4"));
-        }
-
-    }
-
-    public void testInactivePushedToShard() throws InterruptedException {
-        createNode(Settings.builder().put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "100ms",
-                IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms",
-                IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
-
-        // Create two active indices, sharing 32 MB indexing buffer:
-        prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
-
-        ensureGreen();
-
-        index("test1", "type", "1", "f", 1);
-
-        // make shard the shard buffer was set to inactive size
-        final ByteSizeValue inactiveBuffer = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
-        if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) {
-            fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1"));
-        }
-    }
-
-    private void createNode(Settings settings) {
-        internalCluster().startNode(Settings.builder()
-                        .put(ClusterName.SETTING, "IndexingMemoryControllerIT")
-                        .put("node.name", "IndexingMemoryControllerIT")
-                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                        .put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
-                        .put("http.enabled", false)
-                        .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true) // make sure we get what we set :)
-                        .put(IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms")
-                        .put(settings)
-        );
-    }
-}

+ 225 - 111
core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -18,16 +18,20 @@
  */
 package org.elasticsearch.indices;
 
+import org.apache.lucene.index.DirectoryReader;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -37,88 +41,114 @@ import java.util.concurrent.ScheduledFuture;
 
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.hamcrest.Matchers.equalTo;
 
 public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
 
     static class MockController extends IndexingMemoryController {
 
-        final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
+        // Size of each shard's indexing buffer
+        final Map<IndexShard, Long> indexBufferRAMBytesUsed = new HashMap<>();
 
-        final Map<IndexShard, ByteSizeValue> indexingBuffers = new HashMap<>();
+        // How many bytes this shard is currently moving to disk
+        final Map<IndexShard, Long> writingBytes = new HashMap<>();
 
-        final Map<IndexShard, Long> lastIndexTimeNanos = new HashMap<>();
-        final Set<IndexShard> activeShards = new HashSet<>();
-
-        long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
+        // Shards that are currently throttled
+        final Set<IndexShard> throttled = new HashSet<>();
 
         public MockController(Settings settings) {
             super(Settings.builder()
-                    .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
-                    .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
-                    .put(settings)
-                    .build(),
-                null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
+                            .put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it
+                            .put(settings)
+                            .build(),
+                    null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
         }
 
-        public void deleteShard(IndexShard id) {
-            indexingBuffers.remove(id);
+        public void deleteShard(IndexShard shard) {
+            indexBufferRAMBytesUsed.remove(shard);
+            writingBytes.remove(shard);
         }
 
-        public void assertBuffers(IndexShard id, ByteSizeValue indexing) {
-            assertThat(indexingBuffers.get(id), equalTo(indexing));
+        @Override
+        protected List<IndexShard> availableShards() {
+            return new ArrayList<>(indexBufferRAMBytesUsed.keySet());
         }
 
-        public void assertInactive(IndexShard id) {
-            assertThat(indexingBuffers.get(id), equalTo(INACTIVE));
+        @Override
+        protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
+            return indexBufferRAMBytesUsed.get(shard) + writingBytes.get(shard);
         }
 
         @Override
-        protected long currentTimeInNanos() {
-            return TimeValue.timeValueSeconds(currentTimeSec).nanos();
+        protected long getShardWritingBytes(IndexShard shard) {
+            Long bytes = writingBytes.get(shard);
+            if (bytes == null) {
+                return 0;
+            } else {
+                return bytes;
+            }
         }
 
         @Override
-        protected List<IndexShard> availableShards() {
-            return new ArrayList<>(indexingBuffers.keySet());
+        protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
         }
 
         @Override
-        protected boolean shardAvailable(IndexShard shard) {
-            return indexingBuffers.containsKey(shard);
+        public void writeIndexingBufferAsync(IndexShard shard) {
+            long bytes = indexBufferRAMBytesUsed.put(shard, 0L);
+            writingBytes.put(shard, writingBytes.get(shard) + bytes);
+            indexBufferRAMBytesUsed.put(shard, 0L);
         }
 
         @Override
-        protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize) {
-            indexingBuffers.put(shard, shardIndexingBufferSize);
+        public void activateThrottling(IndexShard shard) {
+            assertTrue(throttled.add(shard));
         }
 
         @Override
-        protected boolean checkIdle(IndexShard shard) {
-            final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
-            Long ns = lastIndexTimeNanos.get(shard);
-            if (ns == null) {
-                return true;
-            } else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) {
-                indexingBuffers.put(shard, INACTIVE);
-                activeShards.remove(shard);
-                return true;
-            } else {
-                return false;
+        public void deactivateThrottling(IndexShard shard) {
+            assertTrue(throttled.remove(shard));
+        }
+
+        public void doneWriting(IndexShard shard) {
+            writingBytes.put(shard, 0L);
+        }
+
+        public void assertBuffer(IndexShard shard, int expectedMB) {
+            Long actual = indexBufferRAMBytesUsed.get(shard);
+            if (actual == null) {
+                actual = 0L;
             }
+            assertEquals(expectedMB * 1024 * 1024, actual.longValue());
         }
 
-        public void incrementTimeSec(int sec) {
-            currentTimeSec += sec;
+        public void assertThrottled(IndexShard shard) {
+            assertTrue(throttled.contains(shard));
+        }
+
+        public void assertNotThrottled(IndexShard shard) {
+            assertFalse(throttled.contains(shard));
+        }
+
+        public void assertWriting(IndexShard shard, int expectedMB) {
+            Long actual = writingBytes.get(shard);
+            if (actual == null) {
+                actual = 0L;
+            }
+            assertEquals(expectedMB * 1024 * 1024, actual.longValue());
         }
 
         public void simulateIndexing(IndexShard shard) {
-            lastIndexTimeNanos.put(shard, currentTimeInNanos());
-            if (indexingBuffers.containsKey(shard) == false) {
-                // First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
-                indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
+            Long bytes = indexBufferRAMBytesUsed.get(shard);
+            if (bytes == null) {
+                bytes = 0L;
+                // First time we are seeing this shard:
+                writingBytes.put(shard, 0L);
             }
-            activeShards.add(shard);
+            // Each doc we index takes up a megabyte!
+            bytes += 1024*1024;
+            indexBufferRAMBytesUsed.put(shard, bytes);
             forceCheck();
         }
 
@@ -134,21 +164,21 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
         IndexService test = indicesService.indexService("test");
 
         MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb").build());
+                .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
         IndexShard shard0 = test.getShard(0);
         controller.simulateIndexing(shard0);
-        controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K
+        controller.assertBuffer(shard0, 1);
 
         // add another shard
         IndexShard shard1 = test.getShard(1);
         controller.simulateIndexing(shard1);
-        controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB));
-        controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB));
+        controller.assertBuffer(shard0, 1);
+        controller.assertBuffer(shard1, 1);
 
         // remove first shard
         controller.deleteShard(shard0);
         controller.forceCheck();
-        controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K
+        controller.assertBuffer(shard1, 1);
 
         // remove second shard
         controller.deleteShard(shard1);
@@ -157,85 +187,48 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
         // add a new one
         IndexShard shard2 = test.getShard(2);
         controller.simulateIndexing(shard2);
-        controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K
+        controller.assertBuffer(shard2, 1);
     }
 
     public void testActiveInactive() {
+
         createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
         IndexService test = indicesService.indexService("test");
 
         MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
-            .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s")
-            .build());
+                .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "5mb")
+                .build());
 
         IndexShard shard0 = test.getShard(0);
         controller.simulateIndexing(shard0);
         IndexShard shard1 = test.getShard(1);
         controller.simulateIndexing(shard1);
-        controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB));
-        controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB));
 
-        // index into both shards, move the clock and see that they are still active
+        controller.assertBuffer(shard0, 1);
+        controller.assertBuffer(shard1, 1);
+
         controller.simulateIndexing(shard0);
         controller.simulateIndexing(shard1);
 
-        controller.incrementTimeSec(10);
-        controller.forceCheck();
-
-        // both shards now inactive
-        controller.assertInactive(shard0);
-        controller.assertInactive(shard1);
+        controller.assertBuffer(shard0, 2);
+        controller.assertBuffer(shard1, 2);
 
-        // index into one shard only, see it becomes active
+        // index into one shard only, crosses the 5mb limit, so shard1 is refreshed
         controller.simulateIndexing(shard0);
-        controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB));
-        controller.assertInactive(shard1);
-
-        controller.incrementTimeSec(3); // increment but not enough to become inactive
-        controller.forceCheck();
-        controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB));
-        controller.assertInactive(shard1);
-
-        controller.incrementTimeSec(3); // increment some more
-        controller.forceCheck();
-        controller.assertInactive(shard0);
-        controller.assertInactive(shard1);
+        controller.simulateIndexing(shard0);
+        controller.assertBuffer(shard0, 0);
+        controller.assertBuffer(shard1, 2);
 
-        // index some and shard becomes immediately active
         controller.simulateIndexing(shard1);
-        controller.assertInactive(shard0);
-        controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB));
-    }
-
-    public void testMinShardBufferSizes() {
-        MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
-            .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb")
-            .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build());
-
-        assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB));
-    }
-
-    public void testMaxShardBufferSizes() {
-        MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
-            .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb")
-            .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build());
-
-        assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB));
-    }
-
-    public void testRelativeBufferSizes() {
-        MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%")
-            .build());
-
-        assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB)));
+        controller.simulateIndexing(shard1);
+        controller.assertBuffer(shard1, 4);
+        controller.simulateIndexing(shard1);
+        controller.simulateIndexing(shard1);
+        // shard1 crossed 5 mb and is now cleared:
+        controller.assertBuffer(shard1, 0);
     }
 
-
     public void testMinBufferSizes() {
         MockController controller = new MockController(Settings.builder()
             .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
@@ -246,21 +239,142 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
 
     public void testMaxBufferSizes() {
         MockController controller = new MockController(Settings.builder()
-            .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
-            .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
+                .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
+                .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
 
         assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
     }
 
-    protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
-        createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
+    public void testThrottling() throws Exception {
+        createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
         IndexService test = indicesService.indexService("test");
+
+        MockController controller = new MockController(Settings.builder()
+                .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
         IndexShard shard0 = test.getShard(0);
-        controller.simulateIndexing(shard0);
         IndexShard shard1 = test.getShard(1);
+        IndexShard shard2 = test.getShard(2);
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard0);
+        controller.assertBuffer(shard0, 3);
+        controller.simulateIndexing(shard1);
+        controller.simulateIndexing(shard1);
+
+        // We are now using 5 MB, so we should be writing shard0 since it's using the most heap:
+        controller.assertWriting(shard0, 3);
+        controller.assertWriting(shard1, 0);
+        controller.assertBuffer(shard0, 0);
+        controller.assertBuffer(shard1, 2);
+
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard1);
         controller.simulateIndexing(shard1);
-        controller.assertBuffers(shard0, indexBufferSize);
-        controller.assertBuffers(shard1, indexBufferSize);
+
+        // Now we are still writing 3 MB (shard0), and using 5 MB index buffers, so we should now 1) be writing shard1, and 2) be throttling shard1:
+        controller.assertWriting(shard0, 3);
+        controller.assertWriting(shard1, 4);
+        controller.assertBuffer(shard0, 1);
+        controller.assertBuffer(shard1, 0);
+
+        controller.assertNotThrottled(shard0);
+        controller.assertThrottled(shard1);
+
+        System.out.println("TEST: now index more");
+
+        // More indexing to shard0
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard0);
+        controller.simulateIndexing(shard0);
+
+        // Now we are using 5 MB again, so shard0 should also be writing and now also be throttled:
+        controller.assertWriting(shard0, 8);
+        controller.assertWriting(shard1, 4);
+        controller.assertBuffer(shard0, 0);
+        controller.assertBuffer(shard1, 0);
+
+        controller.assertThrottled(shard0);
+        controller.assertThrottled(shard1);
+
+        // Both shards finally finish writing, and throttling should stop:
+        controller.doneWriting(shard0);
+        controller.doneWriting(shard1);
+        controller.forceCheck();
+        controller.assertNotThrottled(shard0);
+        controller.assertNotThrottled(shard1);
+    }
+
+    // #10312
+    public void testDeletesAloneCanTriggerRefresh() throws Exception {
+        createIndex("index",
+                    Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
+                                      .put(SETTING_NUMBER_OF_REPLICAS, 0)
+                                      .put("index.refresh_interval", -1)
+                                      .build());
+        ensureGreen();
+
+        IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        IndexService indexService = indicesService.indexService("index");
+        IndexShard shard = indexService.getShardOrNull(0);
+        assertNotNull(shard);
+
+        for (int i = 0; i < 100; i++) {
+            String id = Integer.toString(i);
+            client().prepareIndex("index", "type", id).setSource("field", "value").get();
+        }
+
+        // Force merge so we know all merges are done before we start deleting:
+        ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
+        assertNoFailures(r);
+
+        // Make a shell of an IMC to check up on indexing buffer usage:
+        Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
+
+        // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
+        IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
+            @Override
+            protected List<IndexShard> availableShards() {
+                return Collections.singletonList(shard);
+            }
+
+            @Override
+            protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
+                return shard.getIndexBufferRAMBytesUsed();
+            }   
+
+            @Override
+            protected void writeIndexingBufferAsync(IndexShard shard) {
+                // just do it sync'd for this test
+                shard.writeIndexingBuffer();
+            }
+
+            @Override
+            protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
+                return null;
+            }
+        };
+
+        for (int i = 0; i < 100; i++) {
+            String id = Integer.toString(i);
+            client().prepareDelete("index", "type", id).get();
+        }
+
+        final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed();
+
+        imc.forceCheck();
+
+        // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
+        assertBusy(new Runnable() {
+            @Override
+            public void run() {
+                try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
+                    // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
+                    final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
+                    assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
+                }
+            }
+        });
     }
 }

+ 5 - 1
docs/reference/migration/migrate_3_0.asciidoc

@@ -194,7 +194,6 @@ Previously, there were three settings for the ping timeout: `discovery.zen.initi
 the only setting key for the ping timeout is now `discovery.zen.ping_timeout`. The default value for
 ping timeouts remains at three seconds.
 
-
 ==== Recovery settings
 
 Recovery settings deprecated in 1.x have been removed:
@@ -238,6 +237,11 @@ Please change the setting in your configuration files or in the clusterstate to
 
 The 'default' similarity has been renamed to 'classic'.
 
+==== Indexing settings
+
+`indices.memory.min_shard_index_buffer_size` and `indices.memory.max_shard_index_buffer_size` are removed since Elasticsearch now allows any one shard to any
+amount of heap as long as the total indexing buffer heap used across all shards is below the node's `indices.memory.index_buffer_size` (default: 10% of the JVM heap)
+
 [[breaking_30_mapping_changes]]
 === Mapping changes
 

+ 1 - 8
docs/reference/modules/indices/indexing_buffer.asciidoc

@@ -12,7 +12,7 @@ in the cluster:
 
     Accepts either a percentage or a byte size value. It defaults to `10%`,
     meaning that `10%` of the total heap allocated to a node will be used as the
-    indexing buffer size.
+    indexing buffer size shared across all shards.
 
 `indices.memory.min_index_buffer_size`::
 
@@ -23,10 +23,3 @@ in the cluster:
 
     If the `index_buffer_size` is specified as a percentage, then this
     setting can be used to specify an absolute maximum.  Defaults to unbounded.
-
-`indices.memory.min_shard_index_buffer_size`::
-
-    Sets a hard lower limit for the memory allocated per shard for its own
-    indexing buffer. Defaults to `4mb`.
-
-