Browse Source

Introduce a `search_throttled` threadpool (#33732)

Today all searches happen on the search threadpool which is the correct
behavior in almost any case. Yet, there are exceptions where for instance
searches searches should be passed through a single-thread
thread-pool to reduce impact on a node. This change adds a index-private setting that allows to mark an index as throttled for searches and forks off all non-stats searcher access to this thread-pool for indices that are marked as `index.search.throttled`
Simon Willnauer 7 years ago
parent
commit
3522b9084b
17 changed files with 446 additions and 246 deletions
  1. 2 2
      server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java
  2. 7 0
      server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java
  3. 7 0
      server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java
  4. 7 0
      server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java
  5. 7 6
      server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java
  6. 21 50
      server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
  7. 43 2
      server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
  8. 11 42
      server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
  9. 7 0
      server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java
  10. 7 0
      server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java
  11. 1 0
      server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  12. 21 0
      server/src/main/java/org/elasticsearch/index/IndexSettings.java
  13. 193 122
      server/src/main/java/org/elasticsearch/search/SearchService.java
  14. 4 0
      server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
  15. 7 6
      server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java
  16. 3 3
      server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java
  17. 98 13
      server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

@@ -200,7 +200,7 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<Valid
         } catch (QueryShardException|ParsingException e) {
             valid = false;
             error = e.getDetailedMessage();
-        } catch (AssertionError|IOException e) {
+        } catch (AssertionError e) {
             valid = false;
             error = e.getMessage();
         } finally {
@@ -210,7 +210,7 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<Valid
         return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error);
     }
 
-    private String explain(SearchContext context, boolean rewritten) throws IOException {
+    private String explain(SearchContext context, boolean rewritten) {
         Query query = context.query();
         if (rewritten && query instanceof MatchNoDocsQuery) {
             return context.parsedQuery().query().toString();

+ 7 - 0
server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

@@ -152,4 +152,11 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
                 clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()
         );
     }
+
+    @Override
+    protected String getExecutor(ExplainRequest request, ShardId shardId) {
+        IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
+        return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
+            shardId);
+    }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -111,4 +111,11 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
     protected GetResponse newResponse() {
         return new GetResponse();
     }
+
+    @Override
+    protected String getExecutor(GetRequest request, ShardId shardId) {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
+            shardId);
+    }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

@@ -102,4 +102,11 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
 
         return response;
     }
+
+    @Override
+    protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
+            shardId);
+    }
 }

+ 7 - 6
server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

@@ -23,6 +23,7 @@ import org.apache.lucene.util.FixedBitSet;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.transport.Transport;
 
@@ -40,7 +41,7 @@ import java.util.stream.Stream;
  * which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
  * large portion of the clusters indices.
  */
-final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
+final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
 
     private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
     private final GroupShardsIterator<SearchShardIterator> shardsIts;
@@ -67,13 +68,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
 
     @Override
     protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
-                                       SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
+                                       SearchActionListener<SearchService.CanMatchResponse> listener) {
         getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
             buildShardSearchRequest(shardIt), getTask(), listener);
     }
 
     @Override
-    protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
+    protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
                                        SearchPhaseContext context) {
 
         return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
@@ -100,7 +101,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
     }
 
     private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
-        SearchPhaseResults<SearchTransportService.CanMatchResponse> {
+        SearchPhaseResults<SearchService.CanMatchResponse> {
 
         private final FixedBitSet possibleMatches;
         private int numPossibleMatches;
@@ -111,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
         }
 
         @Override
-        void consumeResult(SearchTransportService.CanMatchResponse result) {
+        void consumeResult(SearchService.CanMatchResponse result) {
             if (result.canMatch()) {
                 consumeShardFailure(result.getShardIndex());
             }
@@ -139,7 +140,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
         }
 
         @Override
-        Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
+        Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
             return Stream.empty();
         }
     }

+ 21 - 50
server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

@@ -23,7 +23,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.OriginalIndices;
-import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.AbstractComponent;
@@ -112,9 +112,9 @@ public class SearchTransportService extends AbstractComponent {
     }
 
     public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
-                            ActionListener<CanMatchResponse> listener) {
+                            ActionListener<SearchService.CanMatchResponse> listener) {
         transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
-            TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
+            TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new));
     }
 
     public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
@@ -349,83 +349,54 @@ public class SearchTransportService extends AbstractComponent {
 
         transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
             (request, channel, task) -> {
-                searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>(
+                searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(
                     channel, QUERY_ACTION_NAME, request));
             });
         TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
                 (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
 
-        transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
+        transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new,
             (request, channel, task) -> {
-                QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
-                channel.sendResponse(result);
+                searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME,
+                    request));
             });
         TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
 
-        transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
+        transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
             (request, channel, task) -> {
-                ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
-                channel.sendResponse(result);
+                searchService.executeQueryPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, QUERY_SCROLL_ACTION_NAME,
+                 request));
             });
         TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
 
-        transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
+        transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, InternalScrollSearchRequest::new,
             (request, channel, task) -> {
-                ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
-                channel.sendResponse(result);
+                searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
+                    QUERY_FETCH_SCROLL_ACTION_NAME, request));
             });
         TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
 
-        transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
+        transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ShardFetchRequest::new,
             (request, channel, task) -> {
-                FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
-                channel.sendResponse(result);
+                searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel,
+                    FETCH_ID_SCROLL_ACTION_NAME, request));
             });
         TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
 
-        transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, true, true, ShardFetchSearchRequest::new,
+        transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
             (request, channel, task) -> {
-                FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
-                channel.sendResponse(result);
+                searchService.executeFetchPhase(request, (SearchTask)task, new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME,
+                    request));
             });
         TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
 
         // this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
         transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
             (request, channel, task) -> {
-                boolean canMatch = searchService.canMatch(request);
-                channel.sendResponse(new CanMatchResponse(canMatch));
+                searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
             });
         TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
-                (Supplier<TransportResponse>) CanMatchResponse::new);
-    }
-
-    public static final class CanMatchResponse extends SearchPhaseResult {
-        private boolean canMatch;
-
-        public CanMatchResponse() {
-        }
-
-        public CanMatchResponse(boolean canMatch) {
-            this.canMatch = canMatch;
-        }
-
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            canMatch = in.readBoolean();
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeBoolean(canMatch);
-        }
-
-        public boolean canMatch() {
-            return canMatch;
-        }
+                (Supplier<TransportResponse>) SearchService.CanMatchResponse::new);
     }
 
 

+ 43 - 2
server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

@@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportChannel;
@@ -57,6 +58,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
     protected final IndexNameExpressionResolver indexNameExpressionResolver;
 
     final String transportShardAction;
+    private final String shardExecutor;
 
     protected TransportBroadcastAction(Settings settings, String actionName, ClusterService clusterService,
                                        TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
@@ -66,8 +68,9 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
         this.transportService = transportService;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.transportShardAction = actionName + "[s]";
+        this.shardExecutor = shardExecutor;
 
-        transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
+        transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler());
     }
 
     @Override
@@ -276,7 +279,45 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
 
         @Override
         public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
-            channel.sendResponse(shardOperation(request, task));
+            asyncShardOperation(request, task,  new ActionListener<ShardResponse>() {
+                @Override
+                public void onResponse(ShardResponse response) {
+                    try {
+                        channel.sendResponse(response);
+                    } catch (Exception e) {
+                        onFailure(e);
+                    }
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    try {
+                        channel.sendResponse(e);
+                    } catch (Exception e1) {
+                        logger.warn(() -> new ParameterizedMessage(
+                            "Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
+                    }
+                }
+            });
         }
     }
+
+    protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
+        transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                listener.onResponse(shardOperation(request, task));
+            }
+        });
+    }
+
+    protected String getExecutor(ShardRequest request) {
+        return shardExecutor;
+    }
+
 }

+ 11 - 42
server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.ClusterState;
@@ -49,7 +50,6 @@ import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@@ -66,8 +66,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
     protected final TransportService transportService;
     protected final IndexNameExpressionResolver indexNameExpressionResolver;
 
-    final String transportShardAction;
-    final String executor;
+    private final String transportShardAction;
+    private final String executor;
 
     protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
                                          TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
@@ -104,7 +104,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
     protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
 
     protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
-        threadPool.executor(this.executor).execute(new AbstractRunnable() {
+        threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
             @Override
             public void onFailure(Exception e) {
                 listener.onFailure(e);
@@ -274,25 +274,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
         @Override
         public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception {
             // if we have a local operation, execute it on a thread since we don't spawn
-            execute(request, new ActionListener<Response>() {
-                @Override
-                public void onResponse(Response result) {
-                    try {
-                        channel.sendResponse(result);
-                    } catch (Exception e) {
-                        onFailure(e);
-                    }
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    try {
-                        channel.sendResponse(e);
-                    } catch (Exception e1) {
-                        logger.warn("failed to send response for get", e1);
-                    }
-                }
-            });
+            execute(request, new HandledTransportAction.ChannelActionListener<>(channel, actionName, request));
         }
     }
 
@@ -303,25 +285,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
             if (logger.isTraceEnabled()) {
                 logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
             }
-            asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
-                @Override
-                public void onResponse(Response response) {
-                    try {
-                        channel.sendResponse(response);
-                    } catch (IOException e) {
-                        onFailure(e);
-                    }
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    try {
-                        channel.sendResponse(e);
-                    } catch (IOException e1) {
-                        throw new UncheckedIOException(e1);
-                    }
-                }
-            });
+            asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
+                transportShardAction, request));
         }
     }
     /**
@@ -344,4 +309,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
             return concreteIndex;
         }
     }
+
+    protected String getExecutor(Request request, ShardId shardId) {
+        return executor;
+    }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java

@@ -96,4 +96,11 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
 
         return response;
     }
+
+    @Override
+    protected String getExecutor(MultiTermVectorsShardRequest request, ShardId shardId) {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
+            shardId);
+    }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

@@ -113,4 +113,11 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
     protected TermVectorsResponse newResponse() {
         return new TermVectorsResponse();
     }
+
+    @Override
+    protected String getExecutor(TermVectorsRequest request, ShardId shardId) {
+        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
+            shardId);
+    }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -142,6 +142,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
         IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
         IndexSettings.INDEX_SEARCH_IDLE_AFTER,
+        IndexSettings.INDEX_SEARCH_THROTTLED,
         IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
         FieldMapper.IGNORE_MALFORMED_SETTING,
         FieldMapper.COERCE_SETTING,

+ 21 - 0
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -277,6 +277,12 @@ public final class IndexSettings {
         return s;
        }, Property.Dynamic, Property.IndexScope);
 
+    /**
+     * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently
+     */
+    public static final Setting<Boolean> INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false,
+        Property.IndexScope, Property.PrivateIndex, Property.Dynamic);
+
     private final Index index;
     private final Version version;
     private final Logger logger;
@@ -319,6 +325,7 @@ public final class IndexSettings {
     private volatile int maxAnalyzedOffset;
     private volatile int maxTermsCount;
     private volatile String defaultPipeline;
+    private volatile boolean searchThrottled;
 
     /**
      * The maximum number of refresh listeners allows on this shard.
@@ -402,6 +409,7 @@ public final class IndexSettings {
         this.indexMetaData = indexMetaData;
         numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);
 
+        this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
         this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
         this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
         this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
@@ -478,6 +486,7 @@ public final class IndexSettings {
         scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
         scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
+        scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
     }
 
     private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
@@ -879,4 +888,16 @@ public final class IndexSettings {
     public long getSoftDeleteRetentionOperations() {
         return this.softDeleteRetentionOperations;
     }
+
+    /**
+     * Returns true if the this index should be searched throttled ie. using the
+     * {@link org.elasticsearch.threadpool.ThreadPool.Names#SEARCH_THROTTLED} thread-pool
+     */
+    public boolean isSearchThrottled() {
+        return searchThrottled;
+    }
+
+    private void setSearchThrottled(boolean searchThrottled) {
+        this.searchThrottled = searchThrottled;
+    }
 }

+ 193 - 122
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -30,6 +30,8 @@ import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -106,8 +108,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -344,7 +348,21 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         });
     }
 
-    SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
+    private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
+        getExecutor(id).execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+
+            @Override
+            protected void doRun() {
+                listener.onResponse(executable.get());
+            }
+        });
+    }
+
+    private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
         final SearchContext context = createAndPutContext(request);
         final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
         context.incRef();
@@ -405,59 +423,63 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
     }
 
-    public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) {
-        final SearchContext context = findContext(request.id(), request);
-        SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
-        context.incRef();
-        try {
-            context.setTask(task);
-            operationListener.onPreQueryPhase(context);
-            long time = System.nanoTime();
-            contextProcessing(context);
-            processScroll(request, context);
-            queryPhase.execute(context);
-            contextProcessedSuccessfully(context);
-            operationListener.onQueryPhase(context, System.nanoTime() - time);
-            return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
-        } catch (Exception e) {
-            operationListener.onFailedQueryPhase(context);
-            logger.trace("Query phase failed", e);
-            processFailure(context, e);
-            throw ExceptionsHelper.convertToRuntime(e);
-        } finally {
-            cleanContext(context);
-        }
+    public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
+        runAsync(request.id(), () -> {
+            final SearchContext context = findContext(request.id(), request);
+            SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
+            context.incRef();
+            try {
+                context.setTask(task);
+                operationListener.onPreQueryPhase(context);
+                long time = System.nanoTime();
+                contextProcessing(context);
+                processScroll(request, context);
+                queryPhase.execute(context);
+                contextProcessedSuccessfully(context);
+                operationListener.onQueryPhase(context, System.nanoTime() - time);
+                return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
+            } catch (Exception e) {
+                operationListener.onFailedQueryPhase(context);
+                logger.trace("Query phase failed", e);
+                processFailure(context, e);
+                throw ExceptionsHelper.convertToRuntime(e);
+            } finally {
+                cleanContext(context);
+            }
+        }, listener);
     }
 
-    public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) {
-        final SearchContext context = findContext(request.id(), request);
-        context.setTask(task);
-        IndexShard indexShard = context.indexShard();
-        SearchOperationListener operationListener = indexShard.getSearchOperationListener();
-        context.incRef();
-        try {
-            contextProcessing(context);
-            context.searcher().setAggregatedDfs(request.dfs());
+    public void executeQueryPhase(QuerySearchRequest request, SearchTask task, ActionListener<QuerySearchResult> listener) {
+        runAsync(request.id(), () -> {
+            final SearchContext context = findContext(request.id(), request);
+            context.setTask(task);
+            IndexShard indexShard = context.indexShard();
+            SearchOperationListener operationListener = indexShard.getSearchOperationListener();
+            context.incRef();
+            try {
+                contextProcessing(context);
+                context.searcher().setAggregatedDfs(request.dfs());
 
-            operationListener.onPreQueryPhase(context);
-            long time = System.nanoTime();
-            queryPhase.execute(context);
-            if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
-                // no hits, we can release the context since there will be no fetch phase
-                freeContext(context.id());
-            } else {
-                contextProcessedSuccessfully(context);
+                operationListener.onPreQueryPhase(context);
+                long time = System.nanoTime();
+                queryPhase.execute(context);
+                if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
+                    // no hits, we can release the context since there will be no fetch phase
+                    freeContext(context.id());
+                } else {
+                    contextProcessedSuccessfully(context);
+                }
+                operationListener.onQueryPhase(context, System.nanoTime() - time);
+                return context.queryResult();
+            } catch (Exception e) {
+                operationListener.onFailedQueryPhase(context);
+                logger.trace("Query phase failed", e);
+                processFailure(context, e);
+                throw ExceptionsHelper.convertToRuntime(e);
+            } finally {
+                cleanContext(context);
             }
-            operationListener.onQueryPhase(context, System.nanoTime() - time);
-            return context.queryResult();
-        } catch (Exception e) {
-            operationListener.onFailedQueryPhase(context);
-            logger.trace("Query phase failed", e);
-            processFailure(context, e);
-            throw ExceptionsHelper.convertToRuntime(e);
-        } finally {
-            cleanContext(context);
-        }
+        }, listener);
     }
 
     private boolean fetchPhaseShouldFreeContext(SearchContext context) {
@@ -470,66 +492,83 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
     }
 
-    public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) {
-        final SearchContext context = findContext(request.id(), request);
-        context.incRef();
-        try {
-            context.setTask(task);
-            contextProcessing(context);
-            SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
-            processScroll(request, context);
-            operationListener.onPreQueryPhase(context);
-            final long time = System.nanoTime();
+    final Executor getExecutor(long id) {
+        SearchContext context = activeContexts.get(id);
+        if (context == null) {
+            throw new SearchContextMissingException(id);
+        }
+        return getExecutor(context.indexShard());
+    }
+
+    private Executor getExecutor(IndexShard indexShard) {
+        assert indexShard != null;
+        return threadPool.executor(indexShard.indexSettings().isSearchThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH);
+    }
+
+    public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask task,
+                                  ActionListener<ScrollQueryFetchSearchResult> listener) {
+        runAsync(request.id(), () -> {
+            final SearchContext context = findContext(request.id(), request);
+            context.incRef();
             try {
-                queryPhase.execute(context);
+                context.setTask(task);
+                contextProcessing(context);
+                SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
+                processScroll(request, context);
+                operationListener.onPreQueryPhase(context);
+                final long time = System.nanoTime();
+                try {
+                    queryPhase.execute(context);
+                } catch (Exception e) {
+                    operationListener.onFailedQueryPhase(context);
+                    throw ExceptionsHelper.convertToRuntime(e);
+                }
+                long afterQueryTime = System.nanoTime();
+                operationListener.onQueryPhase(context, afterQueryTime - time);
+                QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
+                return new ScrollQueryFetchSearchResult(fetchSearchResult,
+                    context.shardTarget());
             } catch (Exception e) {
-                operationListener.onFailedQueryPhase(context);
+                logger.trace("Fetch phase failed", e);
+                processFailure(context, e);
                 throw ExceptionsHelper.convertToRuntime(e);
+            } finally {
+                cleanContext(context);
             }
-            long afterQueryTime = System.nanoTime();
-            operationListener.onQueryPhase(context, afterQueryTime - time);
-            QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
-
-            return new ScrollQueryFetchSearchResult(fetchSearchResult,
-                context.shardTarget());
-        } catch (Exception e) {
-            logger.trace("Fetch phase failed", e);
-            processFailure(context, e);
-            throw ExceptionsHelper.convertToRuntime(e);
-        } finally {
-            cleanContext(context);
-        }
+        }, listener);
     }
 
-    public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) {
-        final SearchContext context = findContext(request.id(), request);
-        final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
-        context.incRef();
-        try {
-            context.setTask(task);
-            contextProcessing(context);
-            if (request.lastEmittedDoc() != null) {
-                context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
-            }
-            context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
-            operationListener.onPreFetchPhase(context);
-            long time = System.nanoTime();
-            fetchPhase.execute(context);
-            if (fetchPhaseShouldFreeContext(context)) {
-                freeContext(request.id());
-            } else {
-                contextProcessedSuccessfully(context);
+    public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
+        runAsync(request.id(), () -> {
+            final SearchContext context = findContext(request.id(), request);
+            final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
+            context.incRef();
+            try {
+                context.setTask(task);
+                contextProcessing(context);
+                if (request.lastEmittedDoc() != null) {
+                    context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
+                }
+                context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
+                operationListener.onPreFetchPhase(context);
+                long time = System.nanoTime();
+                fetchPhase.execute(context);
+                if (fetchPhaseShouldFreeContext(context)) {
+                    freeContext(request.id());
+                } else {
+                    contextProcessedSuccessfully(context);
+                }
+                operationListener.onFetchPhase(context, System.nanoTime() - time);
+                return context.fetchResult();
+            } catch (Exception e) {
+                operationListener.onFailedFetchPhase(context);
+                logger.trace("Fetch phase failed", e);
+                processFailure(context, e);
+                throw ExceptionsHelper.convertToRuntime(e);
+            } finally {
+                cleanContext(context);
             }
-            operationListener.onFetchPhase(context, System.nanoTime() - time);
-            return context.fetchResult();
-        } catch (Exception e) {
-            operationListener.onFailedFetchPhase(context);
-            logger.trace("Fetch phase failed", e);
-            processFailure(context, e);
-            throw ExceptionsHelper.convertToRuntime(e);
-        } finally {
-            cleanContext(context);
-        }
+        }, listener);
     }
 
     private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException {
@@ -985,6 +1024,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
     }
 
+
+    public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
+        try {
+            listener.onResponse(new CanMatchResponse(canMatch(request)));
+        } catch (IOException e) {
+            listener.onFailure(e);
+        }
+    }
+
     /**
      * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words
      * if the execution of a the search request can be early terminated without executing it. This is for instance not possible if
@@ -1009,31 +1057,27 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
      * The action listener is guaranteed to be executed on the search thread-pool
      */
     private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
+        IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
+        Executor executor = getExecutor(shard);
         ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
-            threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
-                }
-
-                @Override
-                protected void doRun() throws Exception {
-                    listener.onResponse(request);
-                }
-            }), listener::onFailure);
-        IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId());
-        if (shardOrNull != null) {
             // now we need to check if there is a pending refresh and register
-                ActionListener<Rewriteable> finalListener = actionListener;
-                actionListener = ActionListener.wrap(r ->
-                        shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure);
-        }
+            shard.awaitShardSearchActive(b ->
+                executor.execute(new AbstractRunnable() {
+                    @Override
+                    public void onFailure(Exception e) {
+                        listener.onFailure(e);
+                    }
+
+                    @Override
+                    protected void doRun() {
+                        listener.onResponse(request);
+                    }
+                })
+            ), listener::onFailure);
         // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
         // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
         // adding a lot of overhead
         Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
-
-
     }
 
     /**
@@ -1050,4 +1094,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
         return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
     }
+
+    public static final class CanMatchResponse extends SearchPhaseResult {
+        private boolean canMatch;
+
+        public CanMatchResponse() {
+        }
+
+        public CanMatchResponse(boolean canMatch) {
+            this.canMatch = canMatch;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            canMatch = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeBoolean(canMatch);
+        }
+
+        public boolean canMatch() {
+            return canMatch;
+        }
+    }
 }

+ 4 - 0
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -71,6 +71,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         public static final String ANALYZE = "analyze";
         public static final String WRITE = "write";
         public static final String SEARCH = "search";
+        public static final String SEARCH_THROTTLED = "search_throttled";
         public static final String MANAGEMENT = "management";
         public static final String FLUSH = "flush";
         public static final String REFRESH = "refresh";
@@ -135,6 +136,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
         map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
         map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
+        map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
         THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
     }
 
@@ -175,6 +177,8 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
         builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
                         Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
+        builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings,
+            Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200));
         builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
         // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
         // the assumption here is that the listeners should be very lightweight on the listeners side

+ 7 - 6
server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.search.SearchPhaseResult;
+import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.search.internal.ShardSearchTransportRequest;
@@ -64,8 +65,8 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 
             @Override
             public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
-                                     ActionListener<CanMatchResponse> listener) {
-                new Thread(() -> listener.onResponse(new CanMatchResponse(request.shardId().id() == 0 ? shard1 :
+                                     ActionListener<SearchService.CanMatchResponse> listener) {
+                new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(request.shardId().id() == 0 ? shard1 :
                     shard2))).start();
             }
         };
@@ -123,14 +124,14 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 
             @Override
             public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
-                                     ActionListener<CanMatchResponse> listener) {
+                                     ActionListener<SearchService.CanMatchResponse> listener) {
                 boolean throwException = request.shardId().id() != 0;
                 if (throwException && randomBoolean()) {
                     throw new IllegalArgumentException("boom");
                 } else {
                     new Thread(() -> {
                         if (throwException == false) {
-                            listener.onResponse(new CanMatchResponse(shard1));
+                            listener.onResponse(new SearchService.CanMatchResponse(shard1));
                         } else {
                             listener.onFailure(new NullPointerException());
                         }
@@ -192,8 +193,8 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
                             Transport.Connection connection,
                             ShardSearchTransportRequest request,
                             SearchTask task,
-                            ActionListener<CanMatchResponse> listener) {
-                        listener.onResponse(new CanMatchResponse(randomBoolean()));
+                            ActionListener<SearchService.CanMatchResponse> listener) {
+                        listener.onResponse(new SearchService.CanMatchResponse(randomBoolean()));
                     }
                 };
 

+ 3 - 3
server/src/test/java/org/elasticsearch/indices/settings/InternalOrPrivateSettingsPlugin.java

@@ -64,14 +64,14 @@ public class InternalOrPrivateSettingsPlugin extends Plugin implements ActionPlu
 
     public static class UpdateInternalOrPrivateAction extends Action<UpdateInternalOrPrivateAction.Response> {
 
-        static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction();
+        public static final UpdateInternalOrPrivateAction INSTANCE = new UpdateInternalOrPrivateAction();
         private static final String NAME = "indices:admin/settings/update-internal-or-private-index";
 
         public UpdateInternalOrPrivateAction() {
             super(NAME);
         }
 
-        static class Request extends MasterNodeRequest<Request> {
+        public static class Request extends MasterNodeRequest<Request> {
 
             private String index;
             private String key;
@@ -81,7 +81,7 @@ public class InternalOrPrivateSettingsPlugin extends Plugin implements ActionPlu
 
             }
 
-            Request(final String index, final String key, final String value) {
+            public Request(final String index, final String key, final String value) {
                 this.index = index;
                 this.key = key;
                 this.value = value;

+ 98 - 13
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchTask;
 import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -35,7 +36,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.AbstractQueryBuilder;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.index.query.MatchNoneQueryBuilder;
@@ -44,7 +48,10 @@ import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.SearchOperationListener;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.settings.InternalOrPrivateSettingsPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.script.MockScriptEngine;
@@ -55,6 +62,7 @@ import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuil
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.ValueType;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.FetchSearchResult;
 import org.elasticsearch.search.fetch.ShardFetchRequest;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.search.internal.SearchContext;
@@ -77,9 +85,12 @@ import static java.util.Collections.singletonList;
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
 import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
 
 public class SearchServiceTests extends ESSingleNodeTestCase {
 
@@ -90,19 +101,51 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> getPlugins() {
-        return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class);
+        return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class, InternalOrPrivateSettingsPlugin.class);
     }
 
     public static class CustomScriptPlugin extends MockScriptPlugin {
 
         static final String DUMMY_SCRIPT = "dummyScript";
 
+
         @Override
         protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
-            return Collections.singletonMap(DUMMY_SCRIPT, vars -> {
-                return "dummy";
+            return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy");
+        }
+
+        @Override
+        public void onIndexModule(IndexModule indexModule) {
+            indexModule.addSearchOperationListener(new SearchOperationListener() {
+                @Override
+                public void onNewContext(SearchContext context) {
+                    if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]"));
+                    } else {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
+                    }
+                }
+
+                @Override
+                public void onFetchPhase(SearchContext context, long tookInNanos) {
+                    if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]"));
+                    } else {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
+                    }
+                }
+
+                @Override
+                public void onQueryPhase(SearchContext context, long tookInNanos) {
+                    if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]"));
+                    } else {
+                        assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
+                    }
+                }
             });
         }
+
     }
 
     @Override
@@ -210,15 +253,24 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
             final int rounds = scaledRandomIntBetween(100, 10000);
             for (int i = 0; i < rounds; i++) {
                 try {
-                    SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
+                    try {
+                        PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
+                        service.executeQueryPhase(
                             new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
-                                    new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
-                                    true, null, null),
-                        new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
-                    IntArrayList intCursors = new IntArrayList(1);
-                    intCursors.add(0);
-                    ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */);
-                    service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
+                                new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
+                                true, null, null),
+                            new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
+                        SearchPhaseResult searchPhaseResult = result.get();
+                        IntArrayList intCursors = new IntArrayList(1);
+                        intCursors.add(0);
+                        ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */);
+                        PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
+                        service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
+                        listener.get();
+                    } catch (ExecutionException ex) {
+                        assertThat(ex.getCause(), instanceOf(RuntimeException.class));
+                        throw ((RuntimeException)ex.getCause());
+                    }
                 } catch (AlreadyClosedException ex) {
                     throw ex;
                 } catch (IllegalStateException ex) {
@@ -467,4 +519,37 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
             .suggest(new SuggestBuilder())));
 
     }
+
+    public void testSetSearchThrottled() {
+        createIndex("throttled_threadpool_index");
+        client().execute(
+            InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE,
+            new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index",
+                IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "true"))
+            .actionGet();
+        final SearchService service = getInstanceFromNode(SearchService.class);
+        Index index = resolveIndex("throttled_threadpool_index");
+        assertTrue(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled());
+        client().prepareIndex("throttled_threadpool_index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
+        SearchResponse searchResponse = client().prepareSearch("throttled_threadpool_index").setSize(1).get();
+        assertSearchHits(searchResponse, "1");
+        // we add a search action listener in a plugin above to assert that this is actually used
+        client().execute(
+            InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.INSTANCE,
+            new InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request("throttled_threadpool_index",
+                IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), "false"))
+            .actionGet();
+
+        IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
+            client().admin().indices().prepareUpdateSettings("throttled_threadpool_index").setSettings(Settings.builder().put(IndexSettings
+                .INDEX_SEARCH_THROTTLED.getKey(), false)).get());
+        assertEquals("can not update private setting [index.search.throttled]; this setting is managed by Elasticsearch",
+            iae.getMessage());
+        assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled());
+        ShardSearchLocalRequest req = new ShardSearchLocalRequest(new ShardId(index, 0), 1, SearchType.QUERY_THEN_FETCH, null,
+            Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, false, null, null);
+        Thread currentThread = Thread.currentThread();
+        // we still make sure can match is executed on the network thread
+        service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected")));
+    }
 }