SearchPhaseController.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.search.controller;
  20. import com.carrotsearch.hppc.IntArrayList;
  21. import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
  22. import org.apache.lucene.index.Term;
  23. import org.apache.lucene.search.*;
  24. import org.elasticsearch.action.search.SearchRequest;
  25. import org.elasticsearch.cache.recycler.CacheRecycler;
  26. import org.elasticsearch.common.collect.HppcMaps;
  27. import org.elasticsearch.common.component.AbstractComponent;
  28. import org.elasticsearch.common.inject.Inject;
  29. import org.elasticsearch.common.lucene.Lucene;
  30. import org.elasticsearch.common.settings.Settings;
  31. import org.elasticsearch.common.util.BigArrays;
  32. import org.elasticsearch.common.util.concurrent.AtomicArray;
  33. import org.elasticsearch.script.ScriptService;
  34. import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
  35. import org.elasticsearch.search.aggregations.InternalAggregations;
  36. import org.elasticsearch.search.dfs.AggregatedDfs;
  37. import org.elasticsearch.search.dfs.DfsSearchResult;
  38. import org.elasticsearch.search.fetch.FetchSearchResult;
  39. import org.elasticsearch.search.fetch.FetchSearchResultProvider;
  40. import org.elasticsearch.search.internal.InternalSearchHit;
  41. import org.elasticsearch.search.internal.InternalSearchHits;
  42. import org.elasticsearch.search.internal.InternalSearchResponse;
  43. import org.elasticsearch.search.query.QuerySearchResult;
  44. import org.elasticsearch.search.query.QuerySearchResultProvider;
  45. import org.elasticsearch.search.suggest.Suggest;
  46. import java.io.IOException;
  47. import java.util.*;
  48. /**
  49. *
  50. */
  51. public class SearchPhaseController extends AbstractComponent {
  52. public static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = new Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>>() {
  53. @Override
  54. public int compare(AtomicArray.Entry<? extends QuerySearchResultProvider> o1, AtomicArray.Entry<? extends QuerySearchResultProvider> o2) {
  55. int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
  56. if (i == 0) {
  57. i = o1.value.shardTarget().shardId() - o2.value.shardTarget().shardId();
  58. }
  59. return i;
  60. }
  61. };
  62. public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
  63. private final BigArrays bigArrays;
  64. private final boolean optimizeSingleShard;
  65. private ScriptService scriptService;
  66. @Inject
  67. public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays, ScriptService scriptService) {
  68. super(settings);
  69. this.bigArrays = bigArrays;
  70. this.scriptService = scriptService;
  71. this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
  72. }
  73. public boolean optimizeSingleShard() {
  74. return optimizeSingleShard;
  75. }
  76. public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {
  77. ObjectObjectOpenHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap();
  78. ObjectObjectOpenHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
  79. long aggMaxDoc = 0;
  80. for (AtomicArray.Entry<DfsSearchResult> lEntry : results.asList()) {
  81. final Term[] terms = lEntry.value.terms();
  82. final TermStatistics[] stats = lEntry.value.termStatistics();
  83. assert terms.length == stats.length;
  84. for (int i = 0; i < terms.length; i++) {
  85. assert terms[i] != null;
  86. TermStatistics existing = termStatistics.get(terms[i]);
  87. if (existing != null) {
  88. assert terms[i].bytes().equals(existing.term());
  89. // totalTermFrequency is an optional statistic we need to check if either one or both
  90. // are set to -1 which means not present and then set it globally to -1
  91. termStatistics.put(terms[i], new TermStatistics(existing.term(),
  92. existing.docFreq() + stats[i].docFreq(),
  93. optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
  94. } else {
  95. termStatistics.put(terms[i], stats[i]);
  96. }
  97. }
  98. final boolean[] states = lEntry.value.fieldStatistics().allocated;
  99. final Object[] keys = lEntry.value.fieldStatistics().keys;
  100. final Object[] values = lEntry.value.fieldStatistics().values;
  101. for (int i = 0; i < states.length; i++) {
  102. if (states[i]) {
  103. String key = (String) keys[i];
  104. CollectionStatistics value = (CollectionStatistics) values[i];
  105. assert key != null;
  106. CollectionStatistics existing = fieldStatistics.get(key);
  107. if (existing != null) {
  108. CollectionStatistics merged = new CollectionStatistics(
  109. key, existing.maxDoc() + value.maxDoc(),
  110. optionalSum(existing.docCount(), value.docCount()),
  111. optionalSum(existing.sumTotalTermFreq(), value.sumTotalTermFreq()),
  112. optionalSum(existing.sumDocFreq(), value.sumDocFreq())
  113. );
  114. fieldStatistics.put(key, merged);
  115. } else {
  116. fieldStatistics.put(key, value);
  117. }
  118. }
  119. }
  120. aggMaxDoc += lEntry.value.maxDoc();
  121. }
  122. return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
  123. }
  124. private static long optionalSum(long left, long right) {
  125. return Math.min(left, right) == -1 ? -1 : left + right;
  126. }
  127. /**
  128. * @param scrollSort Whether to ignore the from and sort all hits in each shard result. Only used for scroll search
  129. * @param resultsArr Shard result holder
  130. */
  131. public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
  132. List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
  133. if (results.isEmpty()) {
  134. return EMPTY_DOCS;
  135. }
  136. if (optimizeSingleShard) {
  137. boolean canOptimize = false;
  138. QuerySearchResult result = null;
  139. int shardIndex = -1;
  140. if (results.size() == 1) {
  141. canOptimize = true;
  142. result = results.get(0).value.queryResult();
  143. shardIndex = results.get(0).index;
  144. } else {
  145. // lets see if we only got hits from a single shard, if so, we can optimize...
  146. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
  147. if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
  148. if (result != null) { // we already have one, can't really optimize
  149. canOptimize = false;
  150. break;
  151. }
  152. canOptimize = true;
  153. result = entry.value.queryResult();
  154. shardIndex = entry.index;
  155. }
  156. }
  157. }
  158. if (canOptimize) {
  159. int offset = result.from();
  160. if (scrollSort) {
  161. offset = 0;
  162. }
  163. ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
  164. if (scoreDocs.length == 0 || scoreDocs.length < offset) {
  165. return EMPTY_DOCS;
  166. }
  167. int resultDocsSize = result.size();
  168. if ((scoreDocs.length - offset) < resultDocsSize) {
  169. resultDocsSize = scoreDocs.length - offset;
  170. }
  171. ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
  172. for (int i = 0; i < resultDocsSize; i++) {
  173. ScoreDoc scoreDoc = scoreDocs[offset + i];
  174. scoreDoc.shardIndex = shardIndex;
  175. docs[i] = scoreDoc;
  176. }
  177. return docs;
  178. }
  179. }
  180. @SuppressWarnings("unchecked")
  181. AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
  182. Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
  183. QuerySearchResultProvider firstResult = sortedResults[0].value;
  184. final Sort sort;
  185. if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
  186. TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
  187. sort = new Sort(firstTopDocs.fields);
  188. } else {
  189. sort = null;
  190. }
  191. int topN = firstResult.queryResult().size();
  192. // Need to use the length of the resultsArr array, since the slots will be based on the position in the resultsArr array
  193. TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
  194. if (firstResult.includeFetch()) {
  195. // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
  196. // this is also important since we shortcut and fetch only docs from "from" and up to "size"
  197. topN *= sortedResults.length;
  198. }
  199. for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
  200. TopDocs topDocs = sortedResult.value.queryResult().topDocs();
  201. // the 'index' field is the position in the resultsArr atomic array
  202. shardTopDocs[sortedResult.index] = topDocs;
  203. }
  204. int from = firstResult.queryResult().from();
  205. if (scrollSort) {
  206. from = 0;
  207. }
  208. // TopDocs#merge can't deal with null shard TopDocs
  209. for (int i = 0; i < shardTopDocs.length; i++) {
  210. if (shardTopDocs[i] == null) {
  211. shardTopDocs[i] = Lucene.EMPTY_TOP_DOCS;
  212. }
  213. }
  214. TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
  215. return mergedTopDocs.scoreDocs;
  216. }
  217. public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) {
  218. if (request.scroll() != null) {
  219. return getLastEmittedDocPerShard(sortedShardList, numShards);
  220. } else {
  221. return null;
  222. }
  223. }
  224. public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) {
  225. ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
  226. for (ScoreDoc scoreDoc : sortedShardList) {
  227. lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
  228. }
  229. return lastEmittedDocPerShard;
  230. }
  231. /**
  232. * Builds an array, with potential null elements, with docs to load.
  233. */
  234. public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
  235. for (ScoreDoc shardDoc : shardDocs) {
  236. IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
  237. if (list == null) {
  238. list = new IntArrayList(); // can't be shared!, uses unsafe on it later on
  239. docsIdsToLoad.set(shardDoc.shardIndex, list);
  240. }
  241. list.add(shardDoc.doc);
  242. }
  243. }
  244. public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
  245. List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
  246. List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
  247. if (queryResults.isEmpty()) {
  248. return InternalSearchResponse.empty();
  249. }
  250. QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
  251. boolean sorted = false;
  252. int sortScoreIndex = -1;
  253. if (firstResult.topDocs() instanceof TopFieldDocs) {
  254. sorted = true;
  255. TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
  256. for (int i = 0; i < fieldDocs.fields.length; i++) {
  257. if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
  258. sortScoreIndex = i;
  259. }
  260. }
  261. }
  262. // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
  263. long totalHits = 0;
  264. float maxScore = Float.NEGATIVE_INFINITY;
  265. boolean timedOut = false;
  266. Boolean terminatedEarly = null;
  267. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  268. QuerySearchResult result = entry.value.queryResult();
  269. if (result.searchTimedOut()) {
  270. timedOut = true;
  271. }
  272. if (result.terminatedEarly() != null) {
  273. if (terminatedEarly == null) {
  274. terminatedEarly = result.terminatedEarly();
  275. } else if (result.terminatedEarly()) {
  276. terminatedEarly = true;
  277. }
  278. }
  279. totalHits += result.topDocs().totalHits;
  280. if (!Float.isNaN(result.topDocs().getMaxScore())) {
  281. maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
  282. }
  283. }
  284. if (Float.isInfinite(maxScore)) {
  285. maxScore = Float.NaN;
  286. }
  287. // clean the fetch counter
  288. for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
  289. entry.value.fetchResult().initCounter();
  290. }
  291. // merge hits
  292. List<InternalSearchHit> hits = new ArrayList<>();
  293. if (!fetchResults.isEmpty()) {
  294. for (ScoreDoc shardDoc : sortedDocs) {
  295. FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
  296. if (fetchResultProvider == null) {
  297. continue;
  298. }
  299. FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
  300. int index = fetchResult.counterGetAndIncrement();
  301. if (index < fetchResult.hits().internalHits().length) {
  302. InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
  303. searchHit.score(shardDoc.score);
  304. searchHit.shard(fetchResult.shardTarget());
  305. if (sorted) {
  306. FieldDoc fieldDoc = (FieldDoc) shardDoc;
  307. searchHit.sortValues(fieldDoc.fields);
  308. if (sortScoreIndex != -1) {
  309. searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
  310. }
  311. }
  312. hits.add(searchHit);
  313. }
  314. }
  315. }
  316. // merge suggest results
  317. Suggest suggest = null;
  318. if (!queryResults.isEmpty()) {
  319. final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
  320. boolean hasSuggestions = false;
  321. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  322. Suggest shardResult = entry.value.queryResult().queryResult().suggest();
  323. if (shardResult == null) {
  324. continue;
  325. }
  326. hasSuggestions = true;
  327. Suggest.group(groupedSuggestions, shardResult);
  328. }
  329. suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions)) : null;
  330. }
  331. // merge addAggregation
  332. InternalAggregations aggregations = null;
  333. if (!queryResults.isEmpty()) {
  334. if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
  335. List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
  336. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  337. aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
  338. }
  339. aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
  340. }
  341. }
  342. InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
  343. return new InternalSearchResponse(searchHits, aggregations, suggest, timedOut, terminatedEarly);
  344. }
  345. }