|
@@ -20,8 +20,12 @@
|
|
package org.elasticsearch.search;
|
|
package org.elasticsearch.search;
|
|
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
|
+import org.apache.lucene.index.AtomicReaderContext;
|
|
|
|
+import org.apache.lucene.index.SimpleMergedSegmentWarmer;
|
|
import org.apache.lucene.search.TopDocs;
|
|
import org.apache.lucene.search.TopDocs;
|
|
|
|
+import org.apache.lucene.util.InfoStream;
|
|
import org.elasticsearch.ElasticSearchException;
|
|
import org.elasticsearch.ElasticSearchException;
|
|
|
|
+import org.elasticsearch.ElasticSearchIllegalStateException;
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
import org.elasticsearch.action.search.SearchType;
|
|
import org.elasticsearch.action.search.SearchType;
|
|
import org.elasticsearch.cache.recycler.CacheRecycler;
|
|
import org.elasticsearch.cache.recycler.CacheRecycler;
|
|
@@ -39,12 +43,19 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
import org.elasticsearch.index.engine.Engine;
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
|
+import org.elasticsearch.index.fielddata.FieldDataType;
|
|
|
|
+import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
|
|
|
+import org.elasticsearch.index.mapper.DocumentMapper;
|
|
|
|
+import org.elasticsearch.index.mapper.FieldMapper;
|
|
|
|
+import org.elasticsearch.index.mapper.MapperService;
|
|
|
|
+import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
|
import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
|
|
import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
|
|
import org.elasticsearch.index.service.IndexService;
|
|
import org.elasticsearch.index.service.IndexService;
|
|
import org.elasticsearch.index.shard.service.IndexShard;
|
|
import org.elasticsearch.index.shard.service.IndexShard;
|
|
import org.elasticsearch.indices.IndicesLifecycle;
|
|
import org.elasticsearch.indices.IndicesLifecycle;
|
|
import org.elasticsearch.indices.IndicesService;
|
|
import org.elasticsearch.indices.IndicesService;
|
|
import org.elasticsearch.indices.warmer.IndicesWarmer;
|
|
import org.elasticsearch.indices.warmer.IndicesWarmer;
|
|
|
|
+import org.elasticsearch.indices.warmer.IndicesWarmer.WarmerContext;
|
|
import org.elasticsearch.script.ScriptService;
|
|
import org.elasticsearch.script.ScriptService;
|
|
import org.elasticsearch.search.dfs.CachedDfSource;
|
|
import org.elasticsearch.search.dfs.CachedDfSource;
|
|
import org.elasticsearch.search.dfs.DfsPhase;
|
|
import org.elasticsearch.search.dfs.DfsPhase;
|
|
@@ -60,6 +71,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
@@ -125,6 +137,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|
|
|
|
|
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
|
|
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
|
|
|
|
|
|
|
|
+ this.indicesWarmer.addListener(new IndexReaderWarmer());
|
|
|
|
+ this.indicesWarmer.addListener(new FieldDataWarmer());
|
|
this.indicesWarmer.addListener(new SearchWarmer());
|
|
this.indicesWarmer.addListener(new SearchWarmer());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -618,36 +632,157 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ static class IndexReaderWarmer extends IndicesWarmer.Listener {
|
|
|
|
+
|
|
|
|
+ private final SimpleMergedSegmentWarmer warmer = new SimpleMergedSegmentWarmer(InfoStream.NO_OUTPUT);
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void warm(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool) {
|
|
|
|
+ long start = System.nanoTime();
|
|
|
|
+ try {
|
|
|
|
+ for (AtomicReaderContext ctx : context.newSearcher().reader().leaves()) {
|
|
|
|
+ warmer.warm(ctx.reader());
|
|
|
|
+ }
|
|
|
|
+ if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
|
+ indexShard.warmerService().logger().trace("warmed readers, took [{}]", TimeValue.timeValueNanos(System.nanoTime() - start));
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ throw new ElasticSearchIllegalStateException("Unexpected exception while warming-up segment", t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static class FieldDataWarmer extends IndicesWarmer.Listener {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void warm(final IndexShard indexShard, IndexMetaData indexMetaData, final WarmerContext context, ThreadPool threadPool) {
|
|
|
|
+ final MapperService mapperService = indexShard.mapperService();
|
|
|
|
+ final Map<String, FieldMapper<?>> warmUp = new HashMap<String, FieldMapper<?>>();
|
|
|
|
+ boolean parentChild = false;
|
|
|
|
+ for (DocumentMapper docMapper : mapperService) {
|
|
|
|
+ for (FieldMapper<?> fieldMapper : docMapper.mappers().mappers()) {
|
|
|
|
+ if (fieldMapper instanceof ParentFieldMapper) {
|
|
|
|
+ parentChild = true;
|
|
|
|
+ }
|
|
|
|
+ final FieldDataType fieldDataType = fieldMapper.fieldDataType();
|
|
|
|
+ if (fieldDataType == null) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if (fieldDataType.getLoading() != FieldDataType.Loading.EAGER) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ final String indexName = fieldMapper.names().indexName();
|
|
|
|
+ if (warmUp.containsKey(indexName)) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ warmUp.put(indexName, fieldMapper);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
|
|
|
+ final int numTasks = warmUp.size() * context.newSearcher().reader().leaves().size() + (parentChild ? 1 : 0);
|
|
|
|
+ final CountDownLatch latch = new CountDownLatch(numTasks);
|
|
|
|
+ for (final AtomicReaderContext ctx : context.newSearcher().reader().leaves()) {
|
|
|
|
+ for (final FieldMapper<?> fieldMapper : warmUp.values()) {
|
|
|
|
+ threadPool.executor(executor()).execute(new Runnable() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ final long start = System.nanoTime();
|
|
|
|
+ indexFieldDataService.getForField(fieldMapper).load(ctx);
|
|
|
|
+ if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
|
+ indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldMapper.names().name(), TimeValue.timeValueNanos(System.nanoTime() - start));
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldMapper.names().name());
|
|
|
|
+ } finally {
|
|
|
|
+ latch.countDown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (parentChild) {
|
|
|
|
+ threadPool.executor(executor()).execute(new Runnable() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ final long start = System.nanoTime();
|
|
|
|
+ indexShard.indexService().cache().idCache().refresh(context.newSearcher().reader().leaves());
|
|
|
|
+ if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
|
+ indexShard.warmerService().logger().trace("warmed id_cache, took [{}]", TimeValue.timeValueNanos(System.nanoTime() - start));
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ indexShard.warmerService().logger().warn("failed to warm-up id cache", t);
|
|
|
|
+ } finally {
|
|
|
|
+ latch.countDown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ latch.await();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
class SearchWarmer extends IndicesWarmer.Listener {
|
|
class SearchWarmer extends IndicesWarmer.Listener {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void warm(IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext warmerContext) {
|
|
|
|
|
|
+ public void warm(final IndexShard indexShard, final IndexMetaData indexMetaData, final IndicesWarmer.WarmerContext warmerContext, ThreadPool threadPool) {
|
|
IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE);
|
|
IndexWarmersMetaData custom = indexMetaData.custom(IndexWarmersMetaData.TYPE);
|
|
if (custom == null) {
|
|
if (custom == null) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- for (IndexWarmersMetaData.Entry entry : custom.entries()) {
|
|
|
|
- SearchContext context = null;
|
|
|
|
- try {
|
|
|
|
- long now = System.nanoTime();
|
|
|
|
- ShardSearchRequest request = new ShardSearchRequest(indexShard.shardId().index().name(), indexShard.shardId().id(), indexMetaData.numberOfShards(),
|
|
|
|
- SearchType.QUERY_THEN_FETCH /* we don't use COUNT so sorting will also kick in whatever warming logic*/)
|
|
|
|
- .source(entry.source())
|
|
|
|
- .types(entry.types());
|
|
|
|
- context = createContext(request, warmerContext.newSearcher());
|
|
|
|
- queryPhase.execute(context);
|
|
|
|
- long took = System.nanoTime() - now;
|
|
|
|
- if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
|
- indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
|
|
|
|
|
|
+ final CountDownLatch latch = new CountDownLatch(custom.entries().size());
|
|
|
|
+ for (final IndexWarmersMetaData.Entry entry : custom.entries()) {
|
|
|
|
+ threadPool.executor(executor()).execute(new Runnable() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ SearchContext context = null;
|
|
|
|
+ try {
|
|
|
|
+ long now = System.nanoTime();
|
|
|
|
+ ShardSearchRequest request = new ShardSearchRequest(indexShard.shardId().index().name(), indexShard.shardId().id(), indexMetaData.numberOfShards(),
|
|
|
|
+ SearchType.QUERY_THEN_FETCH /* we don't use COUNT so sorting will also kick in whatever warming logic*/)
|
|
|
|
+ .source(entry.source())
|
|
|
|
+ .types(entry.types());
|
|
|
|
+ context = createContext(request, warmerContext.newSearcher());
|
|
|
|
+ queryPhase.execute(context);
|
|
|
|
+ long took = System.nanoTime() - now;
|
|
|
|
+ if (indexShard.warmerService().logger().isTraceEnabled()) {
|
|
|
|
+ indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name());
|
|
|
|
+ } finally {
|
|
|
|
+ try {
|
|
|
|
+ if (context != null) {
|
|
|
|
+ freeContext(context);
|
|
|
|
+ cleanContext(context);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ latch.countDown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- } catch (Throwable t) {
|
|
|
|
- indexShard.warmerService().logger().warn("warmer [{}] failed", t, entry.name());
|
|
|
|
- } finally {
|
|
|
|
- if (context != null) {
|
|
|
|
- freeContext(context);
|
|
|
|
- cleanContext(context);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ latch.await();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|