SearchPhaseController.java 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.search.controller;
  20. import com.carrotsearch.hppc.IntArrayList;
  21. import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
  22. import com.google.common.collect.Lists;
  23. import org.apache.lucene.index.Term;
  24. import org.apache.lucene.search.*;
  25. import org.apache.lucene.util.PriorityQueue;
  26. import org.elasticsearch.action.search.SearchRequest;
  27. import org.elasticsearch.cache.recycler.CacheRecycler;
  28. import org.elasticsearch.common.collect.HppcMaps;
  29. import org.elasticsearch.common.component.AbstractComponent;
  30. import org.elasticsearch.common.inject.Inject;
  31. import org.elasticsearch.common.settings.Settings;
  32. import org.elasticsearch.common.util.BigArrays;
  33. import org.elasticsearch.common.util.concurrent.AtomicArray;
  34. import org.elasticsearch.search.aggregations.InternalAggregations;
  35. import org.elasticsearch.search.dfs.AggregatedDfs;
  36. import org.elasticsearch.search.dfs.DfsSearchResult;
  37. import org.elasticsearch.search.facet.Facet;
  38. import org.elasticsearch.search.facet.InternalFacet;
  39. import org.elasticsearch.search.facet.InternalFacets;
  40. import org.elasticsearch.search.fetch.FetchSearchResult;
  41. import org.elasticsearch.search.fetch.FetchSearchResultProvider;
  42. import org.elasticsearch.search.internal.InternalSearchHit;
  43. import org.elasticsearch.search.internal.InternalSearchHits;
  44. import org.elasticsearch.search.internal.InternalSearchResponse;
  45. import org.elasticsearch.search.query.QuerySearchResult;
  46. import org.elasticsearch.search.query.QuerySearchResultProvider;
  47. import org.elasticsearch.search.suggest.Suggest;
  48. import java.io.IOException;
  49. import java.util.*;
  50. /**
  51. *
  52. */
  53. public class SearchPhaseController extends AbstractComponent {
  54. public static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = new Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>>() {
  55. @Override
  56. public int compare(AtomicArray.Entry<? extends QuerySearchResultProvider> o1, AtomicArray.Entry<? extends QuerySearchResultProvider> o2) {
  57. int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
  58. if (i == 0) {
  59. i = o1.value.shardTarget().shardId() - o2.value.shardTarget().shardId();
  60. }
  61. return i;
  62. }
  63. };
  64. public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
  65. private final CacheRecycler cacheRecycler;
  66. private final BigArrays bigArrays;
  67. private final boolean optimizeSingleShard;
  68. @Inject
  69. public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays) {
  70. super(settings);
  71. this.cacheRecycler = cacheRecycler;
  72. this.bigArrays = bigArrays;
  73. this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
  74. }
  75. public boolean optimizeSingleShard() {
  76. return optimizeSingleShard;
  77. }
  78. public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {
  79. ObjectObjectOpenHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap();
  80. ObjectObjectOpenHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
  81. long aggMaxDoc = 0;
  82. for (AtomicArray.Entry<DfsSearchResult> lEntry : results.asList()) {
  83. final Term[] terms = lEntry.value.terms();
  84. final TermStatistics[] stats = lEntry.value.termStatistics();
  85. assert terms.length == stats.length;
  86. for (int i = 0; i < terms.length; i++) {
  87. assert terms[i] != null;
  88. TermStatistics existing = termStatistics.get(terms[i]);
  89. if (existing != null) {
  90. assert terms[i].bytes().equals(existing.term());
  91. // totalTermFrequency is an optional statistic we need to check if either one or both
  92. // are set to -1 which means not present and then set it globally to -1
  93. termStatistics.put(terms[i], new TermStatistics(existing.term(),
  94. existing.docFreq() + stats[i].docFreq(),
  95. optionalSum(existing.totalTermFreq(), stats[i].totalTermFreq())));
  96. } else {
  97. termStatistics.put(terms[i], stats[i]);
  98. }
  99. }
  100. final boolean[] states = lEntry.value.fieldStatistics().allocated;
  101. final Object[] keys = lEntry.value.fieldStatistics().keys;
  102. final Object[] values = lEntry.value.fieldStatistics().values;
  103. for (int i = 0; i < states.length; i++) {
  104. if (states[i]) {
  105. String key = (String) keys[i];
  106. CollectionStatistics value = (CollectionStatistics) values[i];
  107. assert key != null;
  108. CollectionStatistics existing = fieldStatistics.get(key);
  109. if (existing != null) {
  110. CollectionStatistics merged = new CollectionStatistics(
  111. key, existing.maxDoc() + value.maxDoc(),
  112. optionalSum(existing.docCount(), value.docCount()),
  113. optionalSum(existing.sumTotalTermFreq(), value.sumTotalTermFreq()),
  114. optionalSum(existing.sumDocFreq(), value.sumDocFreq())
  115. );
  116. fieldStatistics.put(key, merged);
  117. } else {
  118. fieldStatistics.put(key, value);
  119. }
  120. }
  121. }
  122. aggMaxDoc += lEntry.value.maxDoc();
  123. }
  124. return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
  125. }
  126. private static long optionalSum(long left, long right) {
  127. return Math.min(left, right) == -1 ? -1 : left + right;
  128. }
  129. public ScoreDoc[] sortDocs(SearchRequest request, boolean useClassicSort, AtomicArray<? extends QuerySearchResultProvider> firstResults) throws IOException {
  130. if (!useClassicSort && request.scroll() != null) {
  131. return sortDocsForScroll(firstResults);
  132. } else {
  133. return sortDocs(firstResults);
  134. }
  135. }
  136. public ScoreDoc[] sortDocsForScroll(AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
  137. List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
  138. if (results.isEmpty()) {
  139. return EMPTY_DOCS;
  140. }
  141. if (optimizeSingleShard) {
  142. boolean canOptimize = false;
  143. QuerySearchResult result = null;
  144. int shardIndex = -1;
  145. if (results.size() == 1) {
  146. canOptimize = true;
  147. result = results.get(0).value.queryResult();
  148. shardIndex = results.get(0).index;
  149. } else {
  150. // lets see if we only got hits from a single shard, if so, we can optimize...
  151. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
  152. if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
  153. if (result != null) { // we already have one, can't really optimize
  154. canOptimize = false;
  155. break;
  156. }
  157. canOptimize = true;
  158. result = entry.value.queryResult();
  159. shardIndex = entry.index;
  160. }
  161. }
  162. }
  163. if (canOptimize) {
  164. ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
  165. if (scoreDocs.length == 0) {
  166. return EMPTY_DOCS;
  167. }
  168. int resultDocsSize = scoreDocs.length < result.size() ? scoreDocs.length : result.size();
  169. ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
  170. for (int i = 0; i < resultDocsSize; i++) {
  171. ScoreDoc scoreDoc = scoreDocs[i];
  172. scoreDoc.shardIndex = shardIndex;
  173. docs[i] = scoreDoc;
  174. }
  175. return docs;
  176. }
  177. }
  178. @SuppressWarnings("unchecked")
  179. AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
  180. Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
  181. QuerySearchResultProvider firstResult = sortedResults[0].value;
  182. final Sort sort;
  183. if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
  184. TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
  185. sort = new Sort(firstTopDocs.fields);
  186. } else {
  187. sort = null;
  188. }
  189. int topN = firstResult.queryResult().size();
  190. // Need to use the length of the resultsArr array, since the slots will be based on the position in the resultsArr array
  191. TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
  192. if (firstResult.includeFetch()) {
  193. // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
  194. // this is also important since we shortcut and fetch only docs from "from" and up to "size"
  195. topN *= sortedResults.length;
  196. }
  197. for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
  198. TopDocs topDocs = sortedResult.value.queryResult().topDocs();
  199. // the 'index' field is the position in the resultsArr atomic array
  200. shardTopDocs[sortedResult.index] = topDocs;
  201. }
  202. // TopDocs#merge can't deal with empty shard TopDocs
  203. for (int i = 0; i < shardTopDocs.length; i++) {
  204. if (shardTopDocs[i] == null) {
  205. shardTopDocs[i] = new TopDocs(0, EMPTY_DOCS, 0.0f);
  206. }
  207. }
  208. TopDocs mergedTopDocs = TopDocs.merge(sort, topN, shardTopDocs);
  209. return mergedTopDocs.scoreDocs;
  210. }
  211. public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) {
  212. if (request.scroll() != null) {
  213. return getLastEmittedDocPerShard(sortedShardList, numShards);
  214. } else {
  215. return null;
  216. }
  217. }
  218. public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) {
  219. ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
  220. for (ScoreDoc scoreDoc : sortedShardList) {
  221. lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
  222. }
  223. return lastEmittedDocPerShard;
  224. }
  225. public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> resultsArr) {
  226. List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
  227. if (results.isEmpty()) {
  228. return EMPTY_DOCS;
  229. }
  230. if (optimizeSingleShard) {
  231. boolean canOptimize = false;
  232. QuerySearchResult result = null;
  233. int shardIndex = -1;
  234. if (results.size() == 1) {
  235. canOptimize = true;
  236. result = results.get(0).value.queryResult();
  237. shardIndex = results.get(0).index;
  238. } else {
  239. // lets see if we only got hits from a single shard, if so, we can optimize...
  240. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
  241. if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
  242. if (result != null) { // we already have one, can't really optimize
  243. canOptimize = false;
  244. break;
  245. }
  246. canOptimize = true;
  247. result = entry.value.queryResult();
  248. shardIndex = entry.index;
  249. }
  250. }
  251. }
  252. if (canOptimize) {
  253. ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
  254. if (scoreDocs.length < result.from()) {
  255. return EMPTY_DOCS;
  256. }
  257. int resultDocsSize = result.size();
  258. if ((scoreDocs.length - result.from()) < resultDocsSize) {
  259. resultDocsSize = scoreDocs.length - result.from();
  260. }
  261. int offset = result.from();
  262. if (result.topDocs() instanceof TopFieldDocs) {
  263. ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
  264. for (int i = 0; i < resultDocsSize; i++) {
  265. ScoreDoc scoreDoc = scoreDocs[offset + i];
  266. scoreDoc.shardIndex = shardIndex;
  267. docs[i] = scoreDoc;
  268. }
  269. return docs;
  270. } else {
  271. ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
  272. for (int i = 0; i < resultDocsSize; i++) {
  273. ScoreDoc scoreDoc = scoreDocs[offset + i];
  274. scoreDoc.shardIndex = shardIndex;
  275. docs[i] = scoreDoc;
  276. }
  277. return docs;
  278. }
  279. }
  280. }
  281. @SuppressWarnings("unchecked")
  282. AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
  283. Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
  284. QuerySearchResultProvider firstResult = sortedResults[0].value;
  285. int totalNumDocs = 0;
  286. int queueSize = firstResult.queryResult().from() + firstResult.queryResult().size();
  287. if (firstResult.includeFetch()) {
  288. // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
  289. // this is also important since we shortcut and fetch only docs from "from" and up to "size"
  290. queueSize *= sortedResults.length;
  291. }
  292. // we don't use TopDocs#merge here because with TopDocs#merge, when pagination, we need to ask for "from + size" topN
  293. // hits, which ends up creating a "from + size" ScoreDoc[], while in our implementation, we can actually get away with
  294. // just create "size" ScoreDoc (the reverse order in the queue). would be nice to improve TopDocs#merge to allow for
  295. // it in which case we won't need this logic...
  296. PriorityQueue queue;
  297. if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
  298. // sorting, first if the type is a String, chance CUSTOM to STRING so we handle nulls properly (since our CUSTOM String sorting might return null)
  299. TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
  300. for (int i = 0; i < fieldDocs.fields.length; i++) {
  301. boolean allValuesAreNull = true;
  302. boolean resolvedField = false;
  303. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
  304. for (ScoreDoc doc : entry.value.queryResult().topDocs().scoreDocs) {
  305. FieldDoc fDoc = (FieldDoc) doc;
  306. if (fDoc.fields[i] != null) {
  307. allValuesAreNull = false;
  308. if (fDoc.fields[i] instanceof String) {
  309. fieldDocs.fields[i] = new SortField(fieldDocs.fields[i].getField(), SortField.Type.STRING, fieldDocs.fields[i].getReverse());
  310. }
  311. resolvedField = true;
  312. break;
  313. }
  314. }
  315. if (resolvedField) {
  316. break;
  317. }
  318. }
  319. if (!resolvedField && allValuesAreNull && fieldDocs.fields[i].getField() != null) {
  320. // we did not manage to resolve a field (and its not score or doc, which have no field), and all the fields are null (which can only happen for STRING), make it a STRING
  321. fieldDocs.fields[i] = new SortField(fieldDocs.fields[i].getField(), SortField.Type.STRING, fieldDocs.fields[i].getReverse());
  322. }
  323. }
  324. queue = new ShardFieldDocSortedHitQueue(fieldDocs.fields, queueSize);
  325. // we need to accumulate for all and then filter the from
  326. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
  327. QuerySearchResult result = entry.value.queryResult();
  328. ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
  329. totalNumDocs += scoreDocs.length;
  330. for (ScoreDoc doc : scoreDocs) {
  331. doc.shardIndex = entry.index;
  332. if (queue.insertWithOverflow(doc) == doc) {
  333. // filled the queue, break
  334. break;
  335. }
  336. }
  337. }
  338. } else {
  339. queue = new ScoreDocQueue(queueSize); // we need to accumulate for all and then filter the from
  340. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
  341. QuerySearchResult result = entry.value.queryResult();
  342. ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
  343. totalNumDocs += scoreDocs.length;
  344. for (ScoreDoc doc : scoreDocs) {
  345. doc.shardIndex = entry.index;
  346. if (queue.insertWithOverflow(doc) == doc) {
  347. // filled the queue, break
  348. break;
  349. }
  350. }
  351. }
  352. }
  353. int resultDocsSize = firstResult.queryResult().size();
  354. if (firstResult.includeFetch()) {
  355. // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
  356. resultDocsSize *= sortedResults.length;
  357. }
  358. if (totalNumDocs < queueSize) {
  359. resultDocsSize = totalNumDocs - firstResult.queryResult().from();
  360. }
  361. if (resultDocsSize <= 0) {
  362. return EMPTY_DOCS;
  363. }
  364. // we only pop the first, this handles "from" nicely since the "from" are down the queue
  365. // that we already fetched, so we are actually popping the "from" and up to "size"
  366. ScoreDoc[] shardDocs = new ScoreDoc[resultDocsSize];
  367. for (int i = resultDocsSize - 1; i >= 0; i--) { // put docs in array
  368. shardDocs[i] = (ScoreDoc) queue.pop();
  369. }
  370. return shardDocs;
  371. }
  372. /**
  373. * Builds an array, with potential null elements, with docs to load.
  374. */
  375. public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
  376. for (ScoreDoc shardDoc : shardDocs) {
  377. IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
  378. if (list == null) {
  379. list = new IntArrayList(); // can't be shared!, uses unsafe on it later on
  380. docsIdsToLoad.set(shardDoc.shardIndex, list);
  381. }
  382. list.add(shardDoc.doc);
  383. }
  384. }
  385. public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
  386. List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
  387. List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
  388. if (queryResults.isEmpty()) {
  389. return InternalSearchResponse.empty();
  390. }
  391. QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
  392. boolean sorted = false;
  393. int sortScoreIndex = -1;
  394. if (firstResult.topDocs() instanceof TopFieldDocs) {
  395. sorted = true;
  396. TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
  397. for (int i = 0; i < fieldDocs.fields.length; i++) {
  398. if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
  399. sortScoreIndex = i;
  400. }
  401. }
  402. }
  403. // merge facets
  404. InternalFacets facets = null;
  405. if (!queryResults.isEmpty()) {
  406. // we rely on the fact that the order of facets is the same on all query results
  407. if (firstResult.facets() != null && firstResult.facets().facets() != null && !firstResult.facets().facets().isEmpty()) {
  408. List<Facet> aggregatedFacets = Lists.newArrayList();
  409. List<Facet> namedFacets = Lists.newArrayList();
  410. for (Facet facet : firstResult.facets()) {
  411. // aggregate each facet name into a single list, and aggregate it
  412. namedFacets.clear();
  413. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  414. for (Facet facet1 : entry.value.queryResult().facets()) {
  415. if (facet.getName().equals(facet1.getName())) {
  416. namedFacets.add(facet1);
  417. }
  418. }
  419. }
  420. if (!namedFacets.isEmpty()) {
  421. Facet aggregatedFacet = ((InternalFacet) namedFacets.get(0)).reduce(new InternalFacet.ReduceContext(cacheRecycler, namedFacets));
  422. aggregatedFacets.add(aggregatedFacet);
  423. }
  424. }
  425. facets = new InternalFacets(aggregatedFacets);
  426. }
  427. }
  428. // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
  429. long totalHits = 0;
  430. float maxScore = Float.NEGATIVE_INFINITY;
  431. boolean timedOut = false;
  432. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  433. QuerySearchResult result = entry.value.queryResult();
  434. if (result.searchTimedOut()) {
  435. timedOut = true;
  436. }
  437. totalHits += result.topDocs().totalHits;
  438. if (!Float.isNaN(result.topDocs().getMaxScore())) {
  439. maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
  440. }
  441. }
  442. if (Float.isInfinite(maxScore)) {
  443. maxScore = Float.NaN;
  444. }
  445. // clean the fetch counter
  446. for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
  447. entry.value.fetchResult().initCounter();
  448. }
  449. // merge hits
  450. List<InternalSearchHit> hits = new ArrayList<>();
  451. if (!fetchResults.isEmpty()) {
  452. for (ScoreDoc shardDoc : sortedDocs) {
  453. FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
  454. if (fetchResultProvider == null) {
  455. continue;
  456. }
  457. FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
  458. int index = fetchResult.counterGetAndIncrement();
  459. if (index < fetchResult.hits().internalHits().length) {
  460. InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
  461. searchHit.score(shardDoc.score);
  462. searchHit.shard(fetchResult.shardTarget());
  463. if (sorted) {
  464. FieldDoc fieldDoc = (FieldDoc) shardDoc;
  465. searchHit.sortValues(fieldDoc.fields);
  466. if (sortScoreIndex != -1) {
  467. searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
  468. }
  469. }
  470. hits.add(searchHit);
  471. }
  472. }
  473. }
  474. // merge suggest results
  475. Suggest suggest = null;
  476. if (!queryResults.isEmpty()) {
  477. final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
  478. boolean hasSuggestions = false;
  479. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  480. Suggest shardResult = entry.value.queryResult().queryResult().suggest();
  481. if (shardResult == null) {
  482. continue;
  483. }
  484. hasSuggestions = true;
  485. Suggest.group(groupedSuggestions, shardResult);
  486. }
  487. suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions)) : null;
  488. }
  489. // merge addAggregation
  490. InternalAggregations aggregations = null;
  491. if (!queryResults.isEmpty()) {
  492. if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
  493. List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
  494. for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
  495. aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
  496. }
  497. aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
  498. }
  499. }
  500. InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
  501. return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut);
  502. }
  503. }