|
|
@@ -68,7 +68,6 @@ import org.elasticsearch.index.shard.IndexShard;
|
|
|
import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.indices.IndicesWarmer;
|
|
|
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
|
|
|
-import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
|
|
|
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
|
|
import org.elasticsearch.node.settings.NodeSettingsService;
|
|
|
import org.elasticsearch.script.ExecutableScript;
|
|
|
@@ -180,8 +179,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
|
|
|
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
|
|
|
|
|
|
- this.indicesWarmer.addListener(new NormsWarmer());
|
|
|
- this.indicesWarmer.addListener(new FieldDataWarmer());
|
|
|
+ this.indicesWarmer.addListener(new NormsWarmer(indicesWarmer));
|
|
|
+ this.indicesWarmer.addListener(new FieldDataWarmer(indicesWarmer));
|
|
|
this.indicesWarmer.addListener(new SearchWarmer());
|
|
|
|
|
|
defaultSearchTimeout = settings.getAsTime(DEFAULT_SEARCH_TIMEOUT, NO_TIMEOUT);
|
|
|
@@ -949,11 +948,15 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
return this.activeContexts.size();
|
|
|
}
|
|
|
|
|
|
- static class NormsWarmer extends IndicesWarmer.Listener {
|
|
|
+ static class NormsWarmer implements IndicesWarmer.Listener {
|
|
|
+ private final IndicesWarmer indicesWarmer;
|
|
|
|
|
|
+ public NormsWarmer(IndicesWarmer indicesWarmer) {
|
|
|
+ this.indicesWarmer = indicesWarmer;
|
|
|
+ }
|
|
|
@Override
|
|
|
- public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) {
|
|
|
- final Loading defaultLoading = Loading.parse(indexMetaData.getSettings().get(NORMS_LOADING_KEY), Loading.LAZY);
|
|
|
+ public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
+ final Loading defaultLoading = Loading.parse(indexShard.getIndexSettings().getSettings().get(NORMS_LOADING_KEY), Loading.LAZY);
|
|
|
final MapperService mapperService = indexShard.mapperService();
|
|
|
final ObjectSet<String> warmUp = new ObjectHashSet<>();
|
|
|
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
|
|
@@ -971,14 +974,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
// Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task
|
|
|
- threadPool.executor(executor()).execute(new Runnable() {
|
|
|
+ indicesWarmer.getExecutor().execute(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
for (ObjectCursor<String> stringObjectCursor : warmUp) {
|
|
|
final String indexName = stringObjectCursor.value;
|
|
|
final long start = System.nanoTime();
|
|
|
- for (final LeafReaderContext ctx : context.searcher().reader().leaves()) {
|
|
|
+ for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
|
|
final NumericDocValues values = ctx.reader().getNormValues(indexName);
|
|
|
if (values != null) {
|
|
|
values.get(0);
|
|
|
@@ -1005,15 +1008,21 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) {
|
|
|
+ public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
return TerminationHandle.NO_WAIT;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static class FieldDataWarmer extends IndicesWarmer.Listener {
|
|
|
+ static class FieldDataWarmer implements IndicesWarmer.Listener {
|
|
|
+
|
|
|
+ private final IndicesWarmer indicesWarmer;
|
|
|
+
|
|
|
+ public FieldDataWarmer(IndicesWarmer indicesWarmer) {
|
|
|
+ this.indicesWarmer = indicesWarmer;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
- public TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) {
|
|
|
+ public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
final MapperService mapperService = indexShard.mapperService();
|
|
|
final Map<String, MappedFieldType> warmUp = new HashMap<>();
|
|
|
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
|
|
@@ -1048,9 +1057,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
}
|
|
|
}
|
|
|
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
|
|
- final Executor executor = threadPool.executor(executor());
|
|
|
- final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size());
|
|
|
- for (final LeafReaderContext ctx : context.searcher().reader().leaves()) {
|
|
|
+ final Executor executor = indicesWarmer.getExecutor();
|
|
|
+ final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
|
|
|
+ for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
|
|
for (final MappedFieldType fieldType : warmUp.values()) {
|
|
|
executor.execute(new Runnable() {
|
|
|
|
|
|
@@ -1081,7 +1090,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public TerminationHandle warmTopReader(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) {
|
|
|
+ public TerminationHandle warmTopReader(final IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
final MapperService mapperService = indexShard.mapperService();
|
|
|
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
|
|
|
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
|
|
@@ -1114,7 +1123,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
}
|
|
|
}
|
|
|
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
|
|
- final Executor executor = threadPool.executor(executor());
|
|
|
+ final Executor executor = indicesWarmer.getExecutor();
|
|
|
final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size());
|
|
|
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
|
|
|
executor.execute(new Runnable() {
|
|
|
@@ -1123,7 +1132,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
try {
|
|
|
final long start = System.nanoTime();
|
|
|
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
|
|
|
- ifd.loadGlobal(context.getDirectoryReader());
|
|
|
+ ifd.loadGlobal(searcher.getDirectoryReader());
|
|
|
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.names().fullName(), TimeValue.timeValueNanos(System.nanoTime() - start));
|
|
|
}
|
|
|
@@ -1144,83 +1153,73 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class SearchWarmer extends IndicesWarmer.Listener {
|
|
|
+ class SearchWarmer implements IndicesWarmer.Listener {
|
|
|
|
|
|
@Override
|
|
|
- public TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) {
|
|
|
- return internalWarm(indexShard, indexMetaData, context, threadPool, false);
|
|
|
+ public TerminationHandle warmNewReaders(IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
+ return internalWarm(indexShard, searcher, false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) {
|
|
|
- return internalWarm(indexShard, indexMetaData, context, threadPool, true);
|
|
|
+ public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
|
|
|
+ return internalWarm(indexShard, searcher, true);
|
|
|
}
|
|
|
|
|
|
- public TerminationHandle internalWarm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool, final boolean top) {
|
|
|
- IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE);
|
|
|
+ public TerminationHandle internalWarm(final IndexShard indexShard, final Engine.Searcher searcher, final boolean top) {
|
|
|
+ IndexWarmersMetaData custom = indexShard.getIndexSettings().getIndexMetaData().custom(IndexWarmersMetaData.TYPE);
|
|
|
if (custom == null) {
|
|
|
return TerminationHandle.NO_WAIT;
|
|
|
}
|
|
|
- final Executor executor = threadPool.executor(executor());
|
|
|
+ final Executor executor = indicesWarmer.getExecutor();
|
|
|
final CountDownLatch latch = new CountDownLatch(custom.entries().size());
|
|
|
for (final IndexWarmersMetaData.Entry entry : custom.entries()) {
|
|
|
- executor.execute(new Runnable() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- SearchContext context = null;
|
|
|
+ executor.execute(() -> {
|
|
|
+ SearchContext context = null;
|
|
|
+ try {
|
|
|
+ long now = System.nanoTime();
|
|
|
+ final IndexService indexService = indicesService.indexServiceSafe(indexShard.shardId().index().name());
|
|
|
+ QueryParseContext queryParseContext = new QueryParseContext(indexService.queryParserService().indicesQueriesRegistry());
|
|
|
+ queryParseContext.parseFieldMatcher(indexService.queryParserService().parseFieldMatcher());
|
|
|
+ ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexShard.getIndexSettings()
|
|
|
+ .getNumberOfShards(),
|
|
|
+ SearchType.QUERY_THEN_FETCH, entry.source().build(queryParseContext), entry.types(), entry.requestCache());
|
|
|
+ context = createContext(request, searcher);
|
|
|
+ // if we use sort, we need to do query to sort on
|
|
|
+ // it and load relevant field data
|
|
|
+ // if not, we might as well set size=0 (and cache
|
|
|
+ // if needed)
|
|
|
+ if (context.sort() == null) {
|
|
|
+ context.size(0);
|
|
|
+ }
|
|
|
+ boolean canCache = indicesQueryCache.canCache(request, context);
|
|
|
+ // early terminate when we can cache, since we
|
|
|
+ // can only do proper caching on top level searcher
|
|
|
+ // also, if we can't cache, and its top, we don't
|
|
|
+ // need to execute it, since we already did when its
|
|
|
+ // not top
|
|
|
+ if (canCache != top) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ loadOrExecuteQueryPhase(request, context, queryPhase);
|
|
|
+ long took = System.nanoTime() - now;
|
|
|
+ if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
+ indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name());
|
|
|
+ } finally {
|
|
|
try {
|
|
|
- long now = System.nanoTime();
|
|
|
- final IndexService indexService = indicesService.indexServiceSafe(indexShard.shardId().index().name());
|
|
|
- QueryParseContext queryParseContext = new QueryParseContext(indexService.queryParserService().indicesQueriesRegistry());
|
|
|
- queryParseContext.parseFieldMatcher(indexService.queryParserService().parseFieldMatcher());
|
|
|
- ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData
|
|
|
- .getNumberOfShards(),
|
|
|
- SearchType.QUERY_THEN_FETCH, entry.source().build(queryParseContext), entry.types(), entry.requestCache());
|
|
|
- context = createContext(request, warmerContext.searcher());
|
|
|
- // if we use sort, we need to do query to sort on
|
|
|
- // it and load relevant field data
|
|
|
- // if not, we might as well set size=0 (and cache
|
|
|
- // if needed)
|
|
|
- if (context.sort() == null) {
|
|
|
- context.size(0);
|
|
|
+ if (context != null) {
|
|
|
+ freeContext(context.id());
|
|
|
+ cleanContext(context);
|
|
|
}
|
|
|
- boolean canCache = indicesQueryCache.canCache(request, context);
|
|
|
- // early terminate when we can cache, since we
|
|
|
- // can only do proper caching on top level searcher
|
|
|
- // also, if we can't cache, and its top, we don't
|
|
|
- // need to execute it, since we already did when its
|
|
|
- // not top
|
|
|
- if (canCache != top) {
|
|
|
- return;
|
|
|
- }
|
|
|
- loadOrExecuteQueryPhase(request, context, queryPhase);
|
|
|
- long took = System.nanoTime() - now;
|
|
|
- if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
- indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
|
|
|
- }
|
|
|
- } catch (Throwable t) {
|
|
|
- indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name());
|
|
|
} finally {
|
|
|
- try {
|
|
|
- if (context != null) {
|
|
|
- freeContext(context.id());
|
|
|
- cleanContext(context);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
+ latch.countDown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
});
|
|
|
}
|
|
|
- return new TerminationHandle() {
|
|
|
- @Override
|
|
|
- public void awaitTermination() throws InterruptedException {
|
|
|
- latch.await();
|
|
|
- }
|
|
|
- };
|
|
|
+ return () -> latch.await();
|
|
|
}
|
|
|
}
|
|
|
|