Browse Source

Index Merge: Improve internal (segment) merging by not initiating it on doc operations, closes #653.

kimchy 14 years ago
parent
commit
6c21c30f31

+ 1 - 1
.idea/runConfigurations/Bootstrap.xml

@@ -7,7 +7,7 @@
       </pattern>
     </extension>
     <option name="MAIN_CLASS_NAME" value="org.elasticsearch.bootstrap.Bootstrap" />
-    <option name="VM_PARAMETERS" value="-server -Xmx1g -Des-foreground=yes -XX:+AggressiveOpts -XX:+UseParNewGC -XX:+UseConcMarkSweepGC" />
+    <option name="VM_PARAMETERS" value="-server -Xmx1g -Des-foreground=yes -XX:+UseParNewGC -XX:+UseConcMarkSweepGC" />
     <option name="PROGRAM_PARAMETERS" value="" />
     <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
     <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />

+ 1 - 1
.idea/runConfigurations/Bootstrap__No_Plugins_.xml

@@ -7,7 +7,7 @@
       </pattern>
     </extension>
     <option name="MAIN_CLASS_NAME" value="org.elasticsearch.bootstrap.Bootstrap" />
-    <option name="VM_PARAMETERS" value="-server -Xmx1g -Des-foreground=yes -XX:+AggressiveOpts -XX:+UseParNewGC -XX:+UseConcMarkSweepGC" />
+    <option name="VM_PARAMETERS" value="-server -Xmx1g -Des-foreground=yes -XX:+UseParNewGC -XX:+UseConcMarkSweepGC" />
     <option name="PROGRAM_PARAMETERS" value="" />
     <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
     <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />

+ 2 - 1
modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java

@@ -50,7 +50,8 @@ public class SingleThreadBulkStress {
 
         Settings settings = settingsBuilder()
                 .put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
-                .put("index.engine.robin.refreshInterval", "-1")
+                .put("index.refresh_interval", "-1")
+                .put("index.merge.async", true)
                 .put("gateway.type", "local")
                 .put(SETTING_NUMBER_OF_SHARDS, 2)
                 .put(SETTING_NUMBER_OF_REPLICAS, 1)

+ 30 - 17
modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

@@ -21,7 +21,6 @@ package org.elasticsearch.index.engine.robin;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.LogMergePolicy;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -40,6 +39,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
 import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
 import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
 import org.elasticsearch.index.engine.*;
+import org.elasticsearch.index.merge.policy.EnableMergePolicy;
 import org.elasticsearch.index.merge.policy.MergePolicyProvider;
 import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
 import org.elasticsearch.index.settings.IndexSettings;
@@ -628,14 +628,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
             throw new FlushNotAllowedEngineException(shardId, "Already flushing...");
         }
 
+        // We can't do prepareCommit here, since we rely on the the segment version for the translog version
+
         // call maybeMerge outside of the write lock since it gets called anyhow within commit/refresh
         // and we want not to suffer this cost within the write lock
-        // We can't do prepareCommit here, since we rely on the the segment version for the translog version
+        // only do it if we don't have an async merging going on, otherwise, we know that we won't do any
+        // merge operation
         try {
+            if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+                ((EnableMergePolicy) indexWriter.getMergePolicy()).enableMerge();
+            }
             indexWriter.maybeMerge();
         } catch (Exception e) {
             flushing.set(false);
             throw new FlushFailedEngineException(shardId, e);
+        } finally {
+            // don't allow merge when committing under write lock
+            if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+                ((EnableMergePolicy) indexWriter.getMergePolicy()).disableMerge();
+            }
         }
         rwl.writeLock().lock();
         try {
@@ -680,7 +691,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
             rwl.writeLock().unlock();
             flushing.set(false);
         }
-        // we flush anyhow before...
+        // we refresh anyhow before...
 //        if (flush.refresh()) {
 //            refresh(new Refresh(false));
 //        }
@@ -693,31 +704,33 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
                 if (indexWriter == null) {
                     throw new EngineClosedException(shardId);
                 }
-                int maxNumberOfSegments = optimize.maxNumSegments();
-                if (maxNumberOfSegments == -1) {
-                    // not set, optimize down to half the configured number of segments
-                    if (indexWriter.getMergePolicy() instanceof LogMergePolicy) {
-                        maxNumberOfSegments = ((LogMergePolicy) indexWriter.getMergePolicy()).getMergeFactor() / 2;
-                        if (maxNumberOfSegments < 0) {
-                            maxNumberOfSegments = 1;
-                        }
-                    }
+                if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+                    ((EnableMergePolicy) indexWriter.getMergePolicy()).enableMerge();
                 }
                 if (optimize.onlyExpungeDeletes()) {
-                    indexWriter.expungeDeletes(optimize.waitForMerge());
+                    indexWriter.expungeDeletes(false);
+                } else if (optimize.maxNumSegments() <= 0) {
+                    indexWriter.maybeMerge();
                 } else {
-                    indexWriter.optimize(maxNumberOfSegments, optimize.waitForMerge());
+                    indexWriter.optimize(optimize.maxNumSegments(), false);
                 }
-                // once we did the optimization, we are "dirty" since we removed deletes potentially which
-                // affects TermEnum
-                dirty = true;
             } catch (Exception e) {
                 throw new OptimizeFailedEngineException(shardId, e);
             } finally {
+                if (indexWriter != null && indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+                    ((EnableMergePolicy) indexWriter.getMergePolicy()).disableMerge();
+                }
                 rwl.readLock().unlock();
                 optimizeMutex.set(false);
             }
         }
+        // wait for the merges outside of the read lock
+        if (optimize.waitForMerge()) {
+            indexWriter.waitForMerges();
+        }
+        // once we did the optimization, we are "dirty" since we removed deletes potentially which
+        // affects TermEnum
+        dirty = true;
         if (optimize.flush()) {
             flush(new Flush());
         }

+ 38 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/index/merge/policy/EnableMergePolicy.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.merge.policy;
+
+/**
+ * Allows to control if merge should be enabled on the current thread or not. Defaults to
+ * not being enabled.
+ *
+ * <p>This allows us to disable merging for things like adding docs or refresh (which might block
+ * if no threads are there to handle the merge) and do it on flush (for example) or on explicit API call.
+ *
+ * @author kimchy (shay.banon)
+ */
+public interface EnableMergePolicy {
+
+    boolean isMergeEnabled();
+
+    void enableMerge();
+
+    void disableMerge();
+}

+ 65 - 5
modules/elasticsearch/src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java

@@ -19,8 +19,7 @@
 
 package org.elasticsearch.index.merge.policy;
 
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.LogByteSizeMergePolicy;
+import org.apache.lucene.index.*;
 import org.elasticsearch.common.Preconditions;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -28,6 +27,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.store.Store;
 
+import java.io.IOException;
+import java.util.Set;
+
 /**
  * @author kimchy (shay.banon)
  */
@@ -38,6 +40,7 @@ public class LogByteSizeMergePolicyProvider extends AbstractIndexShardComponent
     private final int mergeFactor;
     private final int maxMergeDocs;
     private final boolean calibrateSizeByDeletes;
+    private boolean asyncMerge;
 
     @Inject public LogByteSizeMergePolicyProvider(Store store) {
         super(store.shardId(), store.indexSettings());
@@ -48,12 +51,18 @@ public class LogByteSizeMergePolicyProvider extends AbstractIndexShardComponent
         this.mergeFactor = componentSettings.getAsInt("merge_factor", LogByteSizeMergePolicy.DEFAULT_MERGE_FACTOR);
         this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogByteSizeMergePolicy.DEFAULT_MAX_MERGE_DOCS);
         this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
-        logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}] calibrate_size_by_deletes[{}]",
-                mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes);
+        this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
+        logger.debug("using [log_bytes_size] merge policy with merge_factor[{}], min_merge_size[{}], max_merge_size[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
+                mergeFactor, minMergeSize, maxMergeSize, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
     }
 
     @Override public LogByteSizeMergePolicy newMergePolicy(IndexWriter indexWriter) {
-        LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy(indexWriter);
+        LogByteSizeMergePolicy mergePolicy;
+        if (asyncMerge) {
+            mergePolicy = new EnableMergeLogByteSizeMergePolicy(indexWriter);
+        } else {
+            mergePolicy = new LogByteSizeMergePolicy(indexWriter);
+        }
         mergePolicy.setMinMergeMB(minMergeSize.mbFrac());
         mergePolicy.setMaxMergeMB(maxMergeSize.mbFrac());
         mergePolicy.setMergeFactor(mergeFactor);
@@ -61,4 +70,55 @@ public class LogByteSizeMergePolicyProvider extends AbstractIndexShardComponent
         mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
         return mergePolicy;
     }
+
+    public static class EnableMergeLogByteSizeMergePolicy extends LogByteSizeMergePolicy implements EnableMergePolicy {
+
+        private final ThreadLocal<Boolean> enableMerge = new ThreadLocal<Boolean>() {
+            @Override protected Boolean initialValue() {
+                return Boolean.FALSE;
+            }
+        };
+
+        public EnableMergeLogByteSizeMergePolicy(IndexWriter writer) {
+            super(writer);
+        }
+
+        @Override public void enableMerge() {
+            enableMerge.set(Boolean.TRUE);
+        }
+
+        @Override public void disableMerge() {
+            enableMerge.set(Boolean.FALSE);
+        }
+
+        @Override public boolean isMergeEnabled() {
+            return enableMerge.get() == Boolean.TRUE;
+        }
+
+        @Override public void close() {
+            enableMerge.remove();
+            super.close();
+        }
+
+        @Override public MergeSpecification findMerges(SegmentInfos infos) throws IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMerges(infos);
+        }
+
+        @Override public MergeSpecification findMergesToExpungeDeletes(SegmentInfos segmentInfos) throws CorruptIndexException, IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMergesToExpungeDeletes(segmentInfos);
+        }
+
+        @Override public MergeSpecification findMergesForOptimize(SegmentInfos infos, int maxNumSegments, Set<SegmentInfo> segmentsToOptimize) throws IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMergesForOptimize(infos, maxNumSegments, segmentsToOptimize);
+        }
+    }
 }

+ 65 - 5
modules/elasticsearch/src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java

@@ -19,13 +19,15 @@
 
 package org.elasticsearch.index.merge.policy;
 
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.LogDocMergePolicy;
+import org.apache.lucene.index.*;
 import org.elasticsearch.common.Preconditions;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.store.Store;
 
+import java.io.IOException;
+import java.util.Set;
+
 /**
  * @author kimchy (shay.banon)
  */
@@ -35,6 +37,7 @@ public class LogDocMergePolicyProvider extends AbstractIndexShardComponent imple
     private final int maxMergeDocs;
     private final int mergeFactor;
     private final boolean calibrateSizeByDeletes;
+    private boolean asyncMerge;
 
     @Inject public LogDocMergePolicyProvider(Store store) {
         super(store.shardId(), store.indexSettings());
@@ -44,16 +47,73 @@ public class LogDocMergePolicyProvider extends AbstractIndexShardComponent imple
         this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
         this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
         this.calibrateSizeByDeletes = componentSettings.getAsBoolean("calibrate_size_by_deletes", true);
-        logger.debug("using [log_doc] merge policy with merge_factor[{}] min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}]",
-                mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes);
+        this.asyncMerge = indexSettings.getAsBoolean("index.merge.async", true);
+        logger.debug("using [log_doc] merge policy with merge_factor[{}], min_merge_docs[{}], max_merge_docs[{}], calibrate_size_by_deletes[{}], async_merge[{}]",
+                mergeFactor, minMergeDocs, maxMergeDocs, calibrateSizeByDeletes, asyncMerge);
     }
 
     @Override public LogDocMergePolicy newMergePolicy(IndexWriter indexWriter) {
-        LogDocMergePolicy mergePolicy = new LogDocMergePolicy(indexWriter);
+        LogDocMergePolicy mergePolicy;
+        if (asyncMerge) {
+            mergePolicy = new EnableMergeLogDocMergePolicy(indexWriter);
+        } else {
+            mergePolicy = new LogDocMergePolicy(indexWriter);
+        }
         mergePolicy.setMinMergeDocs(minMergeDocs);
         mergePolicy.setMaxMergeDocs(maxMergeDocs);
         mergePolicy.setMergeFactor(mergeFactor);
         mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
         return mergePolicy;
     }
+
+    public static class EnableMergeLogDocMergePolicy extends LogDocMergePolicy implements EnableMergePolicy {
+
+        private final ThreadLocal<Boolean> enableMerge = new ThreadLocal<Boolean>() {
+            @Override protected Boolean initialValue() {
+                return Boolean.FALSE;
+            }
+        };
+
+        public EnableMergeLogDocMergePolicy(IndexWriter writer) {
+            super(writer);
+        }
+
+        @Override public void enableMerge() {
+            enableMerge.set(Boolean.TRUE);
+        }
+
+        @Override public void disableMerge() {
+            enableMerge.set(Boolean.FALSE);
+        }
+
+        @Override public boolean isMergeEnabled() {
+            return enableMerge.get() == Boolean.TRUE;
+        }
+
+        @Override public void close() {
+            enableMerge.remove();
+            super.close();
+        }
+
+        @Override public MergeSpecification findMerges(SegmentInfos infos) throws IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMerges(infos);
+        }
+
+        @Override public MergeSpecification findMergesToExpungeDeletes(SegmentInfos segmentInfos) throws CorruptIndexException, IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMergesToExpungeDeletes(segmentInfos);
+        }
+
+        @Override public MergeSpecification findMergesForOptimize(SegmentInfos infos, int maxNumSegments, Set<SegmentInfo> segmentsToOptimize) throws IOException {
+            if (enableMerge.get() == Boolean.FALSE) {
+                return null;
+            }
+            return super.findMergesForOptimize(infos, maxNumSegments, segmentsToOptimize);
+        }
+    }
 }

+ 12 - 4
modules/elasticsearch/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java

@@ -19,12 +19,10 @@
 
 package org.elasticsearch.index.merge.scheduler;
 
-import org.apache.lucene.index.ConcurrentMergeScheduler;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.index.*;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.merge.policy.EnableMergePolicy;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.ShardId;
@@ -54,6 +52,16 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
 
     private class CustomConcurrentMergeScheduler extends ConcurrentMergeScheduler {
 
+        @Override public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
+            // if merge is not enabled, don't do any merging...
+            if (writer.getMergePolicy() instanceof EnableMergePolicy) {
+                if (!((EnableMergePolicy) writer.getMergePolicy()).isMergeEnabled()) {
+                    return;
+                }
+            }
+            super.merge(writer);
+        }
+
         @Override protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
             MergeThread thread = super.getMergeThread(writer, merge);
             thread.setName("[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName());

+ 19 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java

@@ -19,14 +19,19 @@
 
 package org.elasticsearch.index.merge.scheduler;
 
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.MergeScheduler;
 import org.apache.lucene.index.SerialMergeScheduler;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.merge.policy.EnableMergePolicy;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.ShardId;
 
+import java.io.IOException;
+
 /**
  * @author kimchy (shay.banon)
  */
@@ -38,6 +43,19 @@ public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent im
     }
 
     @Override public MergeScheduler newMergeScheduler() {
-        return new SerialMergeScheduler();
+        return new CustomSerialMergeScheduler();
+    }
+
+    public static class CustomSerialMergeScheduler extends SerialMergeScheduler {
+
+        @Override public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
+            // if merge is not enabled, don't do any merging...
+            if (writer.getMergePolicy() instanceof EnableMergePolicy) {
+                if (!((EnableMergePolicy) writer.getMergePolicy()).isMergeEnabled()) {
+                    return;
+                }
+            }
+            super.merge(writer);
+        }
     }
 }

+ 45 - 5
modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

@@ -90,7 +90,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
 
     private volatile IndexShardState state;
 
-    private ScheduledFuture refreshScheduledFuture;
+    private volatile ScheduledFuture refreshScheduledFuture;
+
+    private volatile ScheduledFuture optimizeScheduleFuture;
 
     private volatile ShardRouting shardRouting;
 
@@ -212,7 +214,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
                 checkIndex(true);
             }
             engine.start();
-            scheduleRefresherIfNeeded();
+            startScheduledTasksIfNeeded();
             logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.STARTED, reason);
             state = IndexShardState.STARTED;
         }
@@ -459,6 +461,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
                     refreshScheduledFuture.cancel(true);
                     refreshScheduledFuture = null;
                 }
+                if (optimizeScheduleFuture != null) {
+                    optimizeScheduleFuture.cancel(true);
+                    optimizeScheduleFuture = null;
+                }
             }
             logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
             state = IndexShardState.CLOSED;
@@ -499,7 +505,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
             logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
             state = IndexShardState.STARTED;
         }
-        scheduleRefresherIfNeeded();
+        startScheduledTasksIfNeeded();
         engine.refresh(new Engine.Refresh(true));
 
         // clear unreferenced files
@@ -562,14 +568,25 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
         }
     }
 
-    private void scheduleRefresherIfNeeded() {
+    private void startScheduledTasksIfNeeded() {
         if (engine instanceof ScheduledRefreshableEngine) {
             TimeValue refreshInterval = ((ScheduledRefreshableEngine) engine).refreshInterval();
             if (refreshInterval.millis() > 0) {
                 refreshScheduledFuture = threadPool.scheduleWithFixedDelay(new EngineRefresher(), refreshInterval);
-                logger.debug("Scheduling refresher every {}", refreshInterval);
+                logger.debug("scheduling refresher every {}", refreshInterval);
+            } else {
+                logger.debug("scheduled refresher disabled");
             }
         }
+        // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
+        // so, make sure we periodically call it
+        TimeValue optimizeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(30));
+        if (optimizeInterval.millis() > 0) {
+            optimizeScheduleFuture = threadPool.scheduleWithFixedDelay(new EngineOptimizer(), optimizeInterval);
+            logger.debug("scheduling optimizer / merger every {}", optimizeInterval);
+        } else {
+            logger.debug("scheduled optimizer / merger disabled");
+        }
     }
 
     private Query filterByTypesIfNeeded(Query query, String[] types) {
@@ -601,6 +618,29 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
         }
     }
 
+    private class EngineOptimizer implements Runnable {
+        @Override public void run() {
+            try {
+                // -1 means maybe merge
+                engine.optimize(new Engine.Optimize().maxNumSegments(-1).waitForMerge(false));
+            } catch (EngineClosedException e) {
+                // we are being closed, ignore
+            } catch (OptimizeFailedEngineException e) {
+                if (e.getCause() instanceof InterruptedException) {
+                    // ignore, we are being shutdown
+                } else if (e.getCause() instanceof ClosedByInterruptException) {
+                    // ignore, we are being shutdown
+                } else if (e.getCause() instanceof ThreadInterruptedException) {
+                    // ignore, we are being shutdown
+                } else {
+                    logger.warn("Failed to perform scheduled engine optimize/merge", e);
+                }
+            } catch (Exception e) {
+                logger.warn("Failed to perform scheduled engine optimize/merge", e);
+            }
+        }
+    }
+
     private void checkIndex(boolean throwException) throws IndexShardException {
         try {
             if (!IndexReader.indexExists(store.directory())) {

+ 1 - 1
modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java

@@ -301,7 +301,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
                     if (fullRecovery || !isPersistentStorage()) {
                         assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), equalTo(0l));
                     } else {
-                        assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), greaterThan(shardStatus.gatewayRecoveryStatus().indexSize().bytes() - 4098 /* segments file */));
+                        assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), greaterThan(shardStatus.gatewayRecoveryStatus().indexSize().bytes() - 8196 /* segments file and others */));
                     }
                 }
             }