Browse Source

Skip shard refreshes if shard is `search idle` (#27500)

Today we refresh automatically in the background by default very second.
This default behavior has a significant impact on indexing performance
if the refreshes are not needed.
This change introduces a notion of a shard being `search idle` which a
shard transitions to after (default) `30s` without any access to an
external searcher. Once a shard is search idle all scheduled refreshes
will be skipped unless there are any refresh listeners registered.
If a search happens on a `serach idle` shard the search request _park_
on a refresh listener and will be executed once the next scheduled refresh
occurs. This will also turn the shard into the `non-idle` state immediately.

This behavior is only applied if there is no explicit refresh interval set.
Simon Willnauer 7 years ago
parent
commit
f23ed6188d

+ 15 - 0
core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

@@ -33,8 +33,10 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.internal.AliasFilter;
@@ -86,6 +88,19 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
         }
     }
 
+    @Override
+    protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
+        IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
+        IndexShard indexShard = indexService.getShard(shardId.id());
+        indexShard.awaitShardSearchActive(b -> {
+            try {
+                super.asyncShardOperation(request, shardId, listener);
+            } catch (Exception ex) {
+                listener.onFailure(ex);
+            }
+        });
+    }
+
     @Override
     protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException {
         ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,

+ 20 - 1
core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -19,13 +19,13 @@
 
 package org.elasticsearch.action.get;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.routing.Preference;
 import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
@@ -38,6 +38,8 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
+
 /**
  * Performs the get operation.
  */
@@ -76,6 +78,23 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
         }
     }
 
+    @Override
+    protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        IndexShard indexShard = indexService.getShard(shardId.id());
+        if (request.realtime()) { // we are not tied to a refresh cycle here anyway
+            listener.onResponse(shardOperation(request, shardId));
+        } else {
+            indexShard.awaitShardSearchActive(b -> {
+                try {
+                    super.asyncShardOperation(request, shardId, listener);
+                } catch (Exception ex) {
+                    listener.onFailure(ex);
+                }
+            });
+        }
+    }
+
     @Override
     protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());

+ 36 - 4
core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -38,6 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
@@ -47,6 +48,8 @@ import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@@ -78,7 +81,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
         if (!isSubAction()) {
             transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
         }
-        transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
+        transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
     }
 
     /**
@@ -97,6 +100,19 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
 
     protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
 
+    protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
+        threadPool.executor(this.executor).execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                listener.onResponse(shardOperation(request, shardId));
+            }
+        });
+    }
     protected abstract Response newResponse();
 
     protected abstract boolean resolveIndex(Request request);
@@ -291,11 +307,27 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
             if (logger.isTraceEnabled()) {
                 logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
             }
-            Response response = shardOperation(request, request.internalShardId);
-            channel.sendResponse(response);
+            asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
+                @Override
+                public void onResponse(Response response) {
+                    try {
+                        channel.sendResponse(response);
+                    } catch (IOException e) {
+                        onFailure(e);
+                    }
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    try {
+                        channel.sendResponse(e);
+                    } catch (IOException e1) {
+                        throw new UncheckedIOException(e1);
+                    }
+                }
+            });
         }
     }
-
     /**
      * Internal request class that gets built on each node. Holds the original request plus additional info.
      */

+ 20 - 0
core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.termvectors;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@@ -37,6 +38,8 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
+
 /**
  * Performs the get operation.
  */
@@ -82,6 +85,23 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
         }
     }
 
+    @Override
+    protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, ActionListener<TermVectorsResponse> listener) throws IOException {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        IndexShard indexShard = indexService.getShard(shardId.id());
+        if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
+            listener.onResponse(shardOperation(request, shardId));
+        } else {
+            indexShard.awaitShardSearchActive(b -> {
+                try {
+                    super.asyncShardOperation(request, shardId, listener);
+                } catch (Exception ex) {
+                    listener.onFailure(ex);
+                }
+            });
+        }
+    }
+
     @Override
     protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());

+ 1 - 1
core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -36,7 +36,6 @@ import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.mapper.FieldMapper;
 import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.store.FsDirectoryService;
 import org.elasticsearch.index.store.Store;
@@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
         IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
         IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
+        IndexSettings.INDEX_SEARCH_IDLE_AFTER,
         IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
         FieldMapper.IGNORE_MALFORMED_SETTING,
         FieldMapper.COERCE_SETTING,

+ 29 - 11
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.NodeEnvironment;
@@ -624,6 +625,27 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 }
             }
             if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
+                // once we change the refresh interval we schedule yet another refresh
+                // to ensure we are in a clean and predictable state.
+                // it doesn't matter if we move from or to <code>-1</code>  in both cases we want
+                // docs to become visible immediately. This also flushes all pending indexing / search reqeusts
+                // that are waiting for a refresh.
+                threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.warn("forced refresh failed after interval change", e);
+                    }
+
+                    @Override
+                    protected void doRun() throws Exception {
+                        maybeRefreshEngine(true);
+                    }
+
+                    @Override
+                    public boolean isForceExecution() {
+                        return true;
+                    }
+                });
                 rescheduleRefreshTasks();
             }
             final Translog.Durability durability = indexSettings.getTranslogDurability();
@@ -686,17 +708,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         }
     }
 
-    private void maybeRefreshEngine() {
-        if (indexSettings.getRefreshInterval().millis() > 0) {
+    private void maybeRefreshEngine(boolean force) {
+        if (indexSettings.getRefreshInterval().millis() > 0 || force) {
             for (IndexShard shard : this.shards.values()) {
-                if (shard.isReadAllowed()) {
-                    try {
-                        if (shard.isRefreshNeeded()) {
-                            shard.refresh("schedule");
-                        }
-                    } catch (IndexShardClosedException | AlreadyClosedException ex) {
-                        // fine - continue;
-                    }
+                try {
+                    shard.scheduledRefresh();
+                } catch (IndexShardClosedException | AlreadyClosedException ex) {
+                    // fine - continue;
                 }
             }
         }
@@ -896,7 +914,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
 
         @Override
         protected void runInternal() {
-            indexService.maybeRefreshEngine();
+            indexService.maybeRefreshEngine(false);
         }
 
         @Override

+ 21 - 0
core/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -62,6 +62,9 @@ public final class IndexSettings {
     public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
         Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
             Property.IndexScope);
+    public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
+        Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
+            TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
     public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING =
         new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(),
             (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope);
@@ -262,6 +265,8 @@ public final class IndexSettings {
     private volatile int maxNgramDiff;
     private volatile int maxShingleDiff;
     private volatile boolean TTLPurgeDisabled;
+    private volatile TimeValue searchIdleAfter;
+
     /**
      * The maximum number of refresh listeners allows on this shard.
      */
@@ -371,6 +376,7 @@ public final class IndexSettings {
         maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
         this.mergePolicyConfig = new MergePolicyConfig(logger, this);
         this.indexSortConfig = new IndexSortConfig(this);
+        searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
         singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered
         if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) {
             throw new AssertionError(index.toString()  + "multiple types are only allowed on pre 6.x indices but version is: ["
@@ -411,8 +417,11 @@ public final class IndexSettings {
         scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
         scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
         scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
+        scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
     }
 
+    private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
+
     private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
         this.flushThresholdSize = byteSizeValue;
     }
@@ -752,4 +761,16 @@ public final class IndexSettings {
     }
 
     public IndexScopedSettings getScopedSettings() { return scopedSettings;}
+
+    /**
+     * Returns true iff the refresh setting exists or in other words is explicitly set.
+     */
+    public boolean isExplicitRefresh() {
+        return INDEX_REFRESH_INTERVAL_SETTING.exists(settings);
+    }
+
+    /**
+     * Returns the time that an index shard becomes search idle unless it's accessed in between
+     */
+    public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
 }

+ 81 - 6
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -154,6 +154,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -236,6 +237,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      */
     private final RefreshListeners refreshListeners;
 
+    private final AtomicLong lastSearcherAccess = new AtomicLong();
+    private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
+
     public IndexShard(
             ShardRouting shardRouting,
             IndexSettings indexSettings,
@@ -300,6 +304,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         searcherWrapper = indexSearcherWrapper;
         primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
         refreshListeners = buildRefreshListeners();
+        lastSearcherAccess.set(threadPool.relativeTimeInMillis());
         persistMetadata(path, indexSettings, shardRouting, null, logger);
     }
 
@@ -867,6 +872,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         long numDeletedDocs = 0;
         long sizeInBytes = 0;
         try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) {
+            // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause
+            // the next scheduled refresh to go through and refresh the stats as well
+            markSearcherAccessed();
             for (LeafReaderContext reader : searcher.reader().leaves()) {
                 // we go on the segment level here to get accurate numbers
                 final SegmentReader segmentReader = Lucene.segmentReader(reader.reader());
@@ -963,6 +971,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public CompletionStats completionStats(String... fields) {
         CompletionStats completionStats = new CompletionStats();
         try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) {
+            // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause
+            // the next scheduled refresh to go through and refresh the stats as well
+            markSearcherAccessed();
             completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields));
         }
         return completionStats;
@@ -1132,6 +1143,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
     }
 
+    private void markSearcherAccessed() {
+        lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
+    }
+
     private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
         readAllowed();
         final Engine engine = getEngine();
@@ -2433,14 +2448,74 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     /**
-     * Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher *or* there are pending
-     * refresh listeners.
-     * Otherwise <code>false</code>.
+     * Executes a scheduled refresh if necessary.
      *
-     * @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed
+     * @return <code>true</code> iff the engine got refreshed otherwise <code>false</code>
+     */
+    public boolean scheduledRefresh() {
+        boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
+        if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
+            if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
+                && isSearchIdle() && indexSettings.isExplicitRefresh() == false) {
+                // lets skip this refresh since we are search idle and
+                // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
+                // cause the next schedule to refresh.
+                setRefreshPending();
+                return false;
+            } else {
+                refresh("schedule");
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Returns true if this shards is search idle
+     */
+    final boolean isSearchIdle() {
+        return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
+    }
+
+    /**
+     * Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds.
+     */
+    final long getLastSearcherAccess() {
+        return lastSearcherAccess.get();
+    }
+
+    private void setRefreshPending() {
+        Engine engine = getEngine();
+        Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation();
+        Translog.Location location;
+        do {
+            location = this.pendingRefreshLocation.get();
+            if (location != null && lastWriteLocation.compareTo(location) <= 0) {
+                break;
+            }
+        } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
+    }
+
+    /**
+     * Registers the given listener and invokes it once the shard is active again and all
+     * pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be
+     * invoked immediately.
+     * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with
+     *                 <code>true</code> if the listener was registered to wait for a refresh.
      */
-    public boolean isRefreshNeeded() {
-        return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded());
+    public final void awaitShardSearchActive(Consumer<Boolean> listener) {
+        if (isSearchIdle()) {
+            markSearcherAccessed(); // move the shard into non-search idle
+        }
+        final Translog.Location location = pendingRefreshLocation.get();
+        if (location != null) {
+            addRefreshListener(location, (b) -> {
+                pendingRefreshLocation.compareAndSet(location, null);
+                listener.accept(true);
+            });
+        } else {
+            listener.accept(false);
+        }
     }
 
     /**

+ 26 - 12
core/src/main/java/org/elasticsearch/search/SearchService.java

@@ -582,6 +582,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         throws IOException {
         return createSearchContext(request, timeout, true);
     }
+
     private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout,
                                                      boolean assertAsyncActions)
             throws IOException {
@@ -979,22 +980,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
      * The action listener is guaranteed to be executed on the search thread-pool
      */
     private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
+        ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
+            threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
+                }
+
+                @Override
+                protected void doRun() throws Exception {
+                    listener.onResponse(request);
+                }
+            }), listener::onFailure);
+        IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId());
+        if (shardOrNull != null) {
+            // now we need to check if there is a pending refresh and register
+                ActionListener<Rewriteable> finalListener = actionListener;
+                actionListener = ActionListener.wrap(r ->
+                        shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure);
+        }
         // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
         // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
         // adding a lot of overhead
-        Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
-            ActionListener.wrap(r ->
-                    threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
-                        @Override
-                        public void onFailure(Exception e) {
-                            listener.onFailure(e);
-                        }
+        Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
+
 
-                        @Override
-                        protected void doRun() throws Exception {
-                            listener.onResponse(request);
-                        }
-                    }), listener::onFailure));
     }
 
     /**
@@ -1003,4 +1013,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
         return indicesService.getRewriteContext(nowInMillis);
     }
+
+    public IndicesService getIndicesService() {
+        return indicesService;
+    }
 }

+ 98 - 23
core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -18,14 +18,14 @@
  */
 package org.elasticsearch.index.shard;
 
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterInfoService;
@@ -41,11 +41,11 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -56,11 +56,6 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.flush.FlushStats;
-import org.elasticsearch.index.mapper.IdFieldMapper;
-import org.elasticsearch.index.mapper.Mapping;
-import org.elasticsearch.index.mapper.ParseContext;
-import org.elasticsearch.index.mapper.ParsedDocument;
-import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesService;
@@ -82,8 +77,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
@@ -97,6 +94,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
 import static org.hamcrest.Matchers.equalTo;
 
 public class IndexShardIT extends ESSingleNodeTestCase {
@@ -106,21 +104,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
         return pluginList(InternalSettingsPlugin.class);
     }
 
-    private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo,
-                                              ParseContext.Document document, BytesReference source, XContentType xContentType,
-                                              Mapping mappingUpdate) {
-        Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
-        Field versionField = new NumericDocValuesField("_version", 0);
-        SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
-        document.add(uidField);
-        document.add(versionField);
-        document.add(seqID.seqNo);
-        document.add(seqID.seqNoDocValue);
-        document.add(seqID.primaryTerm);
-        return new ParsedDocument(versionField, seqID, id, type, routing,
-                Collections.singletonList(document), source, xContentType, mappingUpdate);
-    }
-
     public void testLockTryingToDelete() throws Exception {
         createIndex("test");
         ensureGreen();
@@ -550,4 +533,96 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE);
         return shardRouting;
     }
+
+    public void testAutomaticRefresh() throws InterruptedException {
+        TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
+        Settings.Builder builder = Settings.builder();
+        if (randomTimeValue != null) {
+            builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue);
+        }
+        IndexService indexService = createIndex("test", builder.build());
+        assertFalse(indexService.getIndexSettings().isExplicitRefresh());
+        ensureGreen();
+        AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE);
+        CountDownLatch started = new CountDownLatch(1);
+        Thread t = new Thread(() -> {
+            SearchResponse searchResponse;
+            started.countDown();
+            do {
+               searchResponse = client().prepareSearch().get();
+           } while (searchResponse.getHits().totalHits != totalNumDocs.get());
+        });
+        t.start();
+        started.await();
+        assertNoSearchHits(client().prepareSearch().get());
+        int numDocs = scaledRandomIntBetween(25, 100);
+        totalNumDocs.set(numDocs);
+        CountDownLatch indexingDone = new CountDownLatch(numDocs);
+        client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
+        indexingDone.countDown(); // one doc is indexed above blocking
+        IndexShard shard = indexService.getShard(0);
+        boolean hasRefreshed = shard.scheduledRefresh();
+        if (randomTimeValue == TimeValue.ZERO) {
+            // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background
+            assertFalse(hasRefreshed);
+            assertTrue(shard.isSearchIdle());
+        } else if (randomTimeValue == null){
+            // with null we are guaranteed to see the doc since do execute the refresh.
+            // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently
+            assertFalse(shard.isSearchIdle());
+        }
+        assertHitCount(client().prepareSearch().get(), 1);
+        for (int i = 1; i < numDocs; i++) {
+            client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
+                .execute(new ActionListener<IndexResponse>() {
+                             @Override
+                             public void onResponse(IndexResponse indexResponse) {
+                                 indexingDone.countDown();
+                             }
+
+                             @Override
+                             public void onFailure(Exception e) {
+                                 indexingDone.countDown();
+                                 throw new AssertionError(e);
+                             }
+                         });
+        }
+        indexingDone.await();
+        t.join();
+    }
+
+    public void testPendingRefreshWithIntervalChange() throws InterruptedException {
+        Settings.Builder builder = Settings.builder();
+        builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO);
+        IndexService indexService = createIndex("test", builder.build());
+        assertFalse(indexService.getIndexSettings().isExplicitRefresh());
+        ensureGreen();
+        assertNoSearchHits(client().prepareSearch().get());
+        client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
+        IndexShard shard = indexService.getShard(0);
+        assertFalse(shard.scheduledRefresh());
+        assertTrue(shard.isSearchIdle());
+        CountDownLatch refreshLatch = new CountDownLatch(1);
+        client().admin().indices().prepareRefresh()
+            .execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently
+        assertHitCount(client().prepareSearch().get(), 1);
+        client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
+        assertFalse(shard.scheduledRefresh());
+
+        // now disable background refresh and make sure the refresh happens
+        CountDownLatch updateSettingsLatch = new CountDownLatch(1);
+        client().admin().indices()
+            .prepareUpdateSettings("test")
+            .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
+            .execute(ActionListener.wrap(updateSettingsLatch::countDown));
+        assertHitCount(client().prepareSearch().get(), 2);
+        // wait for both to ensure we don't have in-flight operations
+        updateSettingsLatch.await();
+        refreshLatch.await();
+
+        client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
+        assertTrue(shard.scheduledRefresh());
+        assertTrue(shard.isSearchIdle());
+        assertHitCount(client().prepareSearch().get(), 3);
+    }
 }

+ 136 - 0
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -62,7 +62,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -70,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineException;
@@ -2585,4 +2588,137 @@ public class IndexShardTests extends IndexShardTestCase {
         public void verify(String verificationToken, DiscoveryNode localNode) {
         }
     }
+
+    public void testIsSearchIdle() throws Exception {
+        Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .build();
+        IndexMetaData metaData = IndexMetaData.builder("test")
+            .putMapping("test", "{ \"properties\": { \"foo\":  { \"type\": \"text\"}}}")
+            .settings(settings)
+            .primaryTerm(0, 1).build();
+        IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
+        recoverShardFromStore(primary);
+        indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        assertFalse(primary.isSearchIdle());
+
+        IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
+        settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
+        scopedSettings.applySettings(settings);
+        assertTrue(primary.isSearchIdle());
+
+        settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(1))
+            .build();
+        scopedSettings.applySettings(settings);
+        assertFalse(primary.isSearchIdle());
+
+        settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10))
+            .build();
+        scopedSettings.applySettings(settings);
+        assertBusy(() -> assertFalse(primary.isSearchIdle()));
+        do {
+            // now loop until we are fast enough... shouldn't take long
+            primary.acquireSearcher("test").close();
+        } while (primary.isSearchIdle());
+        closeShards(primary);
+    }
+
+    public void testScheduledRefresh() throws IOException, InterruptedException {
+        Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .build();
+        IndexMetaData metaData = IndexMetaData.builder("test")
+            .putMapping("test", "{ \"properties\": { \"foo\":  { \"type\": \"text\"}}}")
+            .settings(settings)
+            .primaryTerm(0, 1).build();
+        IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
+        recoverShardFromStore(primary);
+        indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
+        settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
+        scopedSettings.applySettings(settings);
+
+        assertFalse(primary.getEngine().refreshNeeded());
+        indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}");
+        assertTrue(primary.getEngine().refreshNeeded());
+        long lastSearchAccess = primary.getLastSearcherAccess();
+        assertFalse(primary.scheduledRefresh());
+        assertEquals(lastSearchAccess, primary.getLastSearcherAccess());
+        // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below
+        awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess);
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            primary.awaitShardSearchActive(refreshed -> {
+                assertTrue(refreshed);
+                try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
+                    assertEquals(2, searcher.reader().numDocs());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        assertNotEquals("awaitShardSearchActive must access a searcher to remove search idle state", lastSearchAccess,
+            primary.getLastSearcherAccess());
+        assertTrue(lastSearchAccess < primary.getLastSearcherAccess());
+        try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
+            assertEquals(1, searcher.reader().numDocs());
+        }
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        latch.await();
+        CountDownLatch latch1 = new CountDownLatch(1);
+        primary.awaitShardSearchActive(refreshed -> {
+            assertFalse(refreshed);
+            try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
+                assertEquals(2, searcher.reader().numDocs());
+            } finally {
+                latch1.countDown();
+            }
+
+        });
+        latch1.await();
+        closeShards(primary);
+    }
+
+    public void testRefreshIsNeededWithRefreshListeners() throws IOException, InterruptedException {
+        Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .build();
+        IndexMetaData metaData = IndexMetaData.builder("test")
+            .putMapping("test", "{ \"properties\": { \"foo\":  { \"type\": \"text\"}}}")
+            .settings(settings)
+            .primaryTerm(0, 1).build();
+        IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
+        recoverShardFromStore(primary);
+        indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}");
+        CountDownLatch latch = new CountDownLatch(1);
+        primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown());
+        assertEquals(1, latch.getCount());
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        latch.await();
+
+        IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings();
+        settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
+        scopedSettings.applySettings(settings);
+
+        doc = indexDoc(primary, "test", "2", "{\"foo\" : \"bar\"}");
+        CountDownLatch latch1 = new CountDownLatch(1);
+        primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown());
+        assertEquals(1, latch1.getCount());
+        assertTrue(primary.getEngine().refreshNeeded());
+        assertTrue(primary.scheduledRefresh());
+        latch1.await();
+        closeShards(primary);
+    }
 }

+ 13 - 2
docs/reference/index-modules.asciidoc

@@ -107,11 +107,22 @@ specific index module:
     Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
     for the upper bound (e.g. `0-all`).  Defaults to `false` (i.e. disabled).
 
+`index.search.idle.after`::
+    How long a shard can not receive a search or get request until it's considered
+    search idle. (default is `30s`)
+
 `index.refresh_interval`::
 
     How often to perform a refresh operation, which makes recent changes to the
-    index visible to search.  Defaults to `1s`.  Can be set to `-1` to disable
-    refresh.
+    index visible to search. Defaults to `1s`.  Can be set to `-1` to disable
+    refresh. If this setting is not explicitly set, shards that haven't seen
+    search traffic for at least `index.search.idle.after` seconds will not receive
+    background refreshes until they receive a search request. Searches that hit an
+    idle shard where a refresh is pending will wait for the next background
+    refresh (within `1s`). This behavior aims to automatically optimize bulk
+    indexing in the default case when no searches are performed. In order to opt
+    out of this behavior an explicit value of `1s` should set as the refresh
+    interval.
 
 `index.max_result_window`::
 

+ 11 - 1
docs/reference/migration/migrate_7_0/indices.asciidoc

@@ -44,4 +44,14 @@ Indices created with version `7.0.0` onwards will have an automatic `index.numbe
 value set. This might change how documents are distributed across shards depending on how many
 shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the
 `index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time.
-Note: if the number of routing shards equals the number of shards `_split` operations are not supported.
+Note: if the number of routing shards equals the number of shards `_split` operations are not supported.
+
+==== Skipped background refresh on search idle shards.
+
+Shards belonging to an index that does not have an explicit
+`index.refresh_interval` configured will  no longer refresh in the background
+once the shard becomes "search idle", ie the shard hasn't seen any search
+traffic for `index.search.idle.after` seconds (defaults to `30s`). Searches
+that access a search idle shard will be "parked" until the next refresh
+happens.  Indexing requests with `wait_for_refresh` will also trigger
+a background refresh.