Pārlūkot izejas kodu

Merge pull request #19536 from areek/enhancement/completion_suggester_documents

Add support for returning documents with completion suggester
Areek Zillur 9 gadi atpakaļ
vecāks
revīzija
469eb2546d
20 mainītis faili ar 918 papildinājumiem un 206 dzēšanām
  1. 9 9
      core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java
  2. 2 2
      core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java
  3. 8 11
      core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java
  4. 4 7
      core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java
  5. 8 11
      core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java
  6. 2 2
      core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java
  7. 6 5
      core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java
  8. 39 20
      core/src/main/java/org/elasticsearch/search/SearchService.java
  9. 179 77
      core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
  10. 0 3
      core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java
  11. 3 1
      core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java
  12. 11 4
      core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
  13. 39 23
      core/src/main/java/org/elasticsearch/search/suggest/Suggest.java
  14. 1 1
      core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java
  15. 106 25
      core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java
  16. 234 0
      core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java
  17. 113 0
      core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java
  18. 73 0
      core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java
  19. 61 0
      core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java
  20. 20 5
      docs/reference/search/suggesters/completion-suggest.asciidoc

+ 9 - 9
core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

@@ -46,6 +46,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.internal.ShardSearchTransportRequest;
 import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.search.query.QuerySearchResultProvider;
+import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.List;
@@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
     protected final AtomicArray<FirstResult> firstResults;
     private volatile AtomicArray<ShardSearchFailure> shardFailures;
     private final Object shardFailuresMutex = new Object();
-    protected volatile ScoreDoc[] sortedShardList;
+    protected volatile ScoreDoc[] sortedShardDocs;
 
     protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
                                         IndexNameExpressionResolver indexNameExpressionResolver,
@@ -321,8 +322,11 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
         // we only release search context that we did not fetch from if we are not scrolling
         if (request.scroll() == null) {
             for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
-                final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
-                if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
+                QuerySearchResult queryResult = entry.value.queryResult().queryResult();
+                final TopDocs topDocs = queryResult.topDocs();
+                final Suggest suggest = queryResult.suggest();
+                if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches
+                    ||suggest != null && suggest.hasScoreDocs()) // or had suggest docs
                     && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
                     try {
                         DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
@@ -343,12 +347,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
 
     protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
                                                          ScoreDoc[] lastEmittedDocPerShard) {
-        if (lastEmittedDocPerShard != null) {
-            ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
-            return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
-        } else {
-            return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
-        }
+        final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null;
+        return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
     }
 
     protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,

+ 2 - 2
core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java

@@ -118,8 +118,8 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
         threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
             @Override
             public void doRun() throws IOException {
-                sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
-                final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
+                sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
+                final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
                     queryFetchResults);
                 String scrollId = null;
                 if (request.scroll() != null) {

+ 8 - 11
core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

@@ -135,18 +135,17 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
     }
 
     void innerExecuteFetchPhase() throws Exception {
-        boolean useScroll = request.scroll() != null;
-        sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
-        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+        final boolean isScrollRequest = request.scroll() != null;
+        sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
+        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
 
         if (docIdsToLoad.asList().isEmpty()) {
             finishHim();
             return;
         }
 
-        final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
-            request, sortedShardList, firstResults.length()
-        );
+        final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
+            searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
         final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
         for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
             QuerySearchResult queryResult = queryResults.get(entry.index);
@@ -196,12 +195,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
         threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
             @Override
             public void doRun() throws IOException {
-                final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
+                final boolean isScrollRequest = request.scroll() != null;
+                final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
                     fetchResults);
-                String scrollId = null;
-                if (request.scroll() != null) {
-                    scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
-                }
+                String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
                 listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
                     buildTookInMillis(), buildShardFailures()));
                 releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);

+ 4 - 7
core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java

@@ -60,14 +60,11 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
         threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
             @Override
             public void doRun() throws IOException {
-                boolean useScroll = request.scroll() != null;
-                sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
-                final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
+                final boolean isScrollRequest = request.scroll() != null;
+                sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
+                final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
                     firstResults);
-                String scrollId = null;
-                if (request.scroll() != null) {
-                    scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
-                }
+                String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
                 listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
                     buildTookInMillis(), buildShardFailures()));
             }

+ 8 - 11
core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

@@ -68,18 +68,17 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
 
     @Override
     protected void moveToSecondPhase() throws Exception {
-        boolean useScroll = request.scroll() != null;
-        sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
-        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+        final boolean isScrollRequest = request.scroll() != null;
+        sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
+        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
 
         if (docIdsToLoad.asList().isEmpty()) {
             finishHim();
             return;
         }
 
-        final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
-            request, sortedShardList, firstResults.length()
-        );
+        final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
+            searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null;
         final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
         for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
             QuerySearchResultProvider queryResult = firstResults.get(entry.index);
@@ -129,12 +128,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
         threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
             @Override
             public void doRun() throws IOException {
-                final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
+                final boolean isScrollRequest = request.scroll() != null;
+                final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
                     fetchResults);
-                String scrollId = null;
-                if (request.scroll() != null) {
-                    scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
-                }
+                String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
                 listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
                     successfulOps.get(), buildTookInMillis(), buildShardFailures()));
                 releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);

+ 2 - 2
core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java

@@ -168,8 +168,8 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
     }
 
     private void innerFinishHim() throws Exception {
-        ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
-        final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
+        ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
+        final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
             queryFetchResults);
         String scrollId = null;
         if (request.scroll() != null) {

+ 6 - 5
core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

@@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
     private volatile AtomicArray<ShardSearchFailure> shardFailures;
     final AtomicArray<QuerySearchResult> queryResults;
     final AtomicArray<FetchSearchResult> fetchResults;
-    private volatile ScoreDoc[] sortedShardList;
+    private volatile ScoreDoc[] sortedShardDocs;
     private final AtomicInteger successfulOps;
 
     SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
@@ -165,9 +165,9 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
     }
 
     private void executeFetchPhase() throws Exception {
-        sortedShardList = searchPhaseController.sortDocs(true, queryResults);
+        sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
         AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
-        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
+        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
 
         if (docIdsToLoad.asList().isEmpty()) {
             finishHim();
@@ -175,7 +175,8 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
         }
 
 
-        final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
+        final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(),
+            sortedShardDocs, queryResults.length());
         final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
         for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
             IntArrayList docIds = entry.value;
@@ -216,7 +217,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
     }
 
     private void innerFinishHim() {
-        InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
+        InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
         String scrollId = null;
         if (request.scroll() != null) {
             scrollId = request.scrollId();

+ 39 - 20
core/src/main/java/org/elasticsearch/search/SearchService.java

@@ -21,6 +21,7 @@ package org.elasticsearch.search;
 
 import com.carrotsearch.hppc.ObjectFloatHashMap;
 import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
@@ -87,6 +88,8 @@ import org.elasticsearch.search.rescore.RescoreBuilder;
 import org.elasticsearch.search.searchafter.SearchAfterBuilder;
 import org.elasticsearch.search.sort.SortAndFormats;
 import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Cancellable;
 import org.elasticsearch.threadpool.ThreadPool.Names;
@@ -94,6 +97,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -265,7 +269,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
             loadOrExecuteQueryPhase(request, context);
 
-            if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
+            if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
                 freeContext(context.id());
             } else {
                 contextProcessedSuccessfully(context);
@@ -320,7 +324,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             operationListener.onPreQueryPhase(context);
             long time = System.nanoTime();
             queryPhase.execute(context);
-            if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
+            if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
                 // no hits, we can release the context since there will be no fetch phase
                 freeContext(context.id());
             } else {
@@ -811,40 +815,55 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
     }
 
-    private static final int[] EMPTY_DOC_IDS = new int[0];
-
     /**
      * Shortcut ids to load, we load only "from" and up to "size". The phase controller
      * handles this as well since the result is always size * shards for Q_A_F
      */
     private void shortcutDocIdsToLoad(SearchContext context) {
+        final int[] docIdsToLoad;
+        int docsOffset = 0;
+        final Suggest suggest = context.queryResult().suggest();
+        int numSuggestDocs = 0;
+        final List<CompletionSuggestion> completionSuggestions;
+        if (suggest != null && suggest.hasScoreDocs()) {
+            completionSuggestions = suggest.filter(CompletionSuggestion.class);
+            for (CompletionSuggestion completionSuggestion : completionSuggestions) {
+                numSuggestDocs += completionSuggestion.getOptions().size();
+            }
+        } else {
+            completionSuggestions = Collections.emptyList();
+        }
         if (context.request().scroll() != null) {
             TopDocs topDocs = context.queryResult().topDocs();
-            int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
+            docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
             for (int i = 0; i < topDocs.scoreDocs.length; i++) {
-                docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
+                docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
             }
-            context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
         } else {
             TopDocs topDocs = context.queryResult().topDocs();
             if (topDocs.scoreDocs.length < context.from()) {
                 // no more docs...
-                context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
-                return;
-            }
-            int totalSize = context.from() + context.size();
-            int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
-            int counter = 0;
-            for (int i = context.from(); i < totalSize; i++) {
-                if (i < topDocs.scoreDocs.length) {
-                    docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
-                } else {
-                    break;
+                docIdsToLoad = new int[numSuggestDocs];
+            } else {
+                int totalSize = context.from() + context.size();
+                docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
+                    numSuggestDocs];
+                for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
+                    docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
                 }
-                counter++;
             }
-            context.docIdsToLoad(docIdsToLoad, 0, counter);
         }
+        for (CompletionSuggestion completionSuggestion : completionSuggestions) {
+            for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
+                docIdsToLoad[docsOffset++] = option.getDoc().doc;
+            }
+        }
+        context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
+    }
+
+    private static boolean hasHits(final QuerySearchResult searchResult) {
+        return searchResult.topDocs().scoreDocs.length > 0 ||
+            (searchResult.suggest() != null && searchResult.suggest().hasScoreDocs());
     }
 
     private void processScroll(InternalScrollSearchRequest request, SearchContext context) {

+ 179 - 77
core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java

@@ -30,7 +30,6 @@ import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.TermStatistics;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.TopFieldDocs;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.HppcMaps;
 import org.elasticsearch.common.component.AbstractComponent;
@@ -53,18 +52,22 @@ import org.elasticsearch.search.internal.InternalSearchHits;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.profile.ProfileShardResult;
 import org.elasticsearch.search.profile.SearchProfileShardResults;
-import org.elasticsearch.search.profile.query.QueryProfileShardResult;
 import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.search.query.QuerySearchResultProvider;
 import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.Suggest.Suggestion;
+import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -154,6 +157,10 @@ public class SearchPhaseController extends AbstractComponent {
     }
 
     /**
+     * Returns a score doc array of top N search docs across all shards, followed by top suggest docs for each
+     * named completion suggestion across all shards. If more than one named completion suggestion is specified in the
+     * request, the suggest docs for a named suggestion are ordered by the suggestion name.
+     *
      * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
      *                   Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
      * @param resultsArr Shard result holder
@@ -191,19 +198,40 @@ public class SearchPhaseController extends AbstractComponent {
                 offset = 0;
             }
             ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
-            if (scoreDocs.length == 0 || scoreDocs.length < offset) {
-                return EMPTY_DOCS;
+            ScoreDoc[] docs;
+            int numSuggestDocs = 0;
+            final Suggest suggest = result.queryResult().suggest();
+            final List<CompletionSuggestion> completionSuggestions;
+            if (suggest != null) {
+                completionSuggestions = suggest.filter(CompletionSuggestion.class);
+                for (CompletionSuggestion suggestion : completionSuggestions) {
+                    numSuggestDocs += suggestion.getOptions().size();
+                }
+            } else {
+                completionSuggestions = Collections.emptyList();
             }
-
-            int resultDocsSize = result.size();
-            if ((scoreDocs.length - offset) < resultDocsSize) {
-                resultDocsSize = scoreDocs.length - offset;
+            int docsOffset = 0;
+            if (scoreDocs.length == 0 || scoreDocs.length < offset) {
+                docs = new ScoreDoc[numSuggestDocs];
+            } else {
+                int resultDocsSize = result.size();
+                if ((scoreDocs.length - offset) < resultDocsSize) {
+                    resultDocsSize = scoreDocs.length - offset;
+                }
+                docs = new ScoreDoc[resultDocsSize + numSuggestDocs];
+                for (int i = 0; i < resultDocsSize; i++) {
+                    ScoreDoc scoreDoc = scoreDocs[offset + i];
+                    scoreDoc.shardIndex = shardIndex;
+                    docs[i] = scoreDoc;
+                    docsOffset++;
+                }
             }
-            ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
-            for (int i = 0; i < resultDocsSize; i++) {
-                ScoreDoc scoreDoc = scoreDocs[offset + i];
-                scoreDoc.shardIndex = shardIndex;
-                docs[i] = scoreDoc;
+            for (CompletionSuggestion suggestion: completionSuggestions) {
+                for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
+                    ScoreDoc doc = option.getDoc();
+                    doc.shardIndex = shardIndex;
+                    docs[docsOffset++] = doc;
+                }
             }
             return docs;
         }
@@ -213,13 +241,7 @@ public class SearchPhaseController extends AbstractComponent {
         Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
         QuerySearchResultProvider firstResult = sortedResults[0].value;
 
-        int topN = firstResult.queryResult().size();
-        if (firstResult.includeFetch()) {
-            // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
-            // this is also important since we shortcut and fetch only docs from "from" and up to "size"
-            topN *= sortedResults.length;
-        }
-
+        int topN = topN(results);
         int from = firstResult.queryResult().from();
         if (ignoreFrom) {
             from = 0;
@@ -258,40 +280,86 @@ public class SearchPhaseController extends AbstractComponent {
             }
             mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
         }
-        return mergedTopDocs.scoreDocs;
-    }
 
-    public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) {
-        if (request.scroll() != null) {
-            return getLastEmittedDocPerShard(sortedShardList, numShards);
-        } else {
-            return null;
+        ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
+        final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
+        // group suggestions and assign shard index
+        for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
+            Suggest shardSuggest = sortedResult.value.queryResult().suggest();
+            if (shardSuggest != null) {
+                for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
+                    suggestion.setShardIndex(sortedResult.index);
+                    List<Suggestion<CompletionSuggestion.Entry>> suggestions =
+                        groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+                    suggestions.add(suggestion);
+                }
+            }
+        }
+        if (groupedCompletionSuggestions.isEmpty() == false) {
+            int numSuggestDocs = 0;
+            List<Suggestion<? extends Entry<? extends Entry.Option>>> completionSuggestions =
+                new ArrayList<>(groupedCompletionSuggestions.size());
+            for (List<Suggestion<CompletionSuggestion.Entry>> groupedSuggestions : groupedCompletionSuggestions.values()) {
+                final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions);
+                assert completionSuggestion != null;
+                numSuggestDocs += completionSuggestion.getOptions().size();
+                completionSuggestions.add(completionSuggestion);
+            }
+            scoreDocs = new ScoreDoc[mergedTopDocs.scoreDocs.length + numSuggestDocs];
+            System.arraycopy(mergedTopDocs.scoreDocs, 0, scoreDocs, 0, mergedTopDocs.scoreDocs.length);
+            int offset = mergedTopDocs.scoreDocs.length;
+            Suggest suggestions = new Suggest(completionSuggestions);
+            for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) {
+                for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
+                    scoreDocs[offset++] = option.getDoc();
+                }
+            }
         }
+        return scoreDocs;
     }
 
-    public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) {
+    public ScoreDoc[] getLastEmittedDocPerShard(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
+                                                ScoreDoc[] sortedScoreDocs, int numShards) {
         ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
-        for (ScoreDoc scoreDoc : sortedShardList) {
-            lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
+        if (queryResults.isEmpty() == false) {
+            long fetchHits = 0;
+            for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
+                fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length;
+            }
+            // from is always zero as when we use scroll, we ignore from
+            long size = Math.min(fetchHits, topN(queryResults));
+            for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
+                ScoreDoc scoreDoc = sortedScoreDocs[sortedDocsIndex];
+                lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
+            }
         }
         return lastEmittedDocPerShard;
+
     }
 
     /**
      * Builds an array, with potential null elements, with docs to load.
      */
-    public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
+    public void fillDocIdsToLoad(AtomicArray<IntArrayList> docIdsToLoad, ScoreDoc[] shardDocs) {
         for (ScoreDoc shardDoc : shardDocs) {
-            IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
-            if (list == null) {
-                list = new IntArrayList(); // can't be shared!, uses unsafe on it later on
-                docsIdsToLoad.set(shardDoc.shardIndex, list);
+            IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex);
+            if (shardDocIdsToLoad == null) {
+                shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on
+                docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad);
             }
-            list.add(shardDoc.doc);
+            shardDocIdsToLoad.add(shardDoc.doc);
         }
     }
 
-    public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
+    /**
+     * Enriches search hits and completion suggestion hits from <code>sortedDocs</code> using <code>fetchResultsArr</code>,
+     * merges suggestions, aggregations and profile results
+     *
+     * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named
+     * completion suggestion ordered by suggestion name
+     */
+    public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
+                                        AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
                                         AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
 
         List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
@@ -317,6 +385,7 @@ public class SearchPhaseController extends AbstractComponent {
 
         // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
         long totalHits = 0;
+        long fetchHits = 0;
         float maxScore = Float.NEGATIVE_INFINITY;
         boolean timedOut = false;
         Boolean terminatedEarly = null;
@@ -333,6 +402,7 @@ public class SearchPhaseController extends AbstractComponent {
                 }
             }
             totalHits += result.topDocs().totalHits;
+            fetchHits += result.topDocs().scoreDocs.length;
             if (!Float.isNaN(result.topDocs().getMaxScore())) {
                 maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
             }
@@ -345,11 +415,13 @@ public class SearchPhaseController extends AbstractComponent {
         for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
             entry.value.fetchResult().initCounter();
         }
-
+        int from = ignoreFrom ? 0 : firstResult.queryResult().from();
+        int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults));
         // merge hits
         List<InternalSearchHit> hits = new ArrayList<>();
         if (!fetchResults.isEmpty()) {
-            for (ScoreDoc shardDoc : sortedDocs) {
+            for (int i = 0; i < numSearchHits; i++) {
+                ScoreDoc shardDoc = sortedDocs[i];
                 FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
                 if (fetchResultProvider == null) {
                     continue;
@@ -360,7 +432,6 @@ public class SearchPhaseController extends AbstractComponent {
                     InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
                     searchHit.score(shardDoc.score);
                     searchHit.shard(fetchResult.shardTarget());
-
                     if (sorted) {
                         FieldDoc fieldDoc = (FieldDoc) shardDoc;
                         searchHit.sortValues(fieldDoc.fields, firstResult.sortValueFormats());
@@ -368,7 +439,6 @@ public class SearchPhaseController extends AbstractComponent {
                             searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
                         }
                     }
-
                     hits.add(searchHit);
                 }
             }
@@ -376,38 +446,72 @@ public class SearchPhaseController extends AbstractComponent {
 
         // merge suggest results
         Suggest suggest = null;
-        if (!queryResults.isEmpty()) {
-            final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
-            boolean hasSuggestions = false;
-            for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
-                Suggest shardResult = entry.value.queryResult().queryResult().suggest();
-
-                if (shardResult == null) {
-                    continue;
+        if (firstResult.suggest() != null) {
+            final Map<String, List<Suggestion>> groupedSuggestions = new HashMap<>();
+            for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
+                Suggest shardSuggest = queryResult.value.queryResult().suggest();
+                if (shardSuggest != null) {
+                    for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : shardSuggest) {
+                        List<Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+                        suggestionList.add(suggestion);
+                    }
+                }
+            }
+            if (groupedSuggestions.isEmpty() == false) {
+                suggest = new Suggest(Suggest.reduce(groupedSuggestions));
+                if (!fetchResults.isEmpty()) {
+                    int currentOffset = numSearchHits;
+                    for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) {
+                        final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
+                        for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
+                            ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
+                            FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
+                            if (fetchSearchResultProvider == null) {
+                                continue;
+                            }
+                            FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult();
+                            int fetchResultIndex = fetchResult.counterGetAndIncrement();
+                            if (fetchResultIndex < fetchResult.hits().internalHits().length) {
+                                InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
+                                CompletionSuggestion.Entry.Option suggestOption =
+                                    suggestionOptions.get(scoreDocIndex - currentOffset);
+                                hit.score(shardDoc.score);
+                                hit.shard(fetchResult.shardTarget());
+                                suggestOption.setHit(hit);
+                            }
+                        }
+                        currentOffset += suggestionOptions.size();
+                    }
+                    assert currentOffset == sortedDocs.length : "expected no more score doc slices";
                 }
-                hasSuggestions = true;
-                Suggest.group(groupedSuggestions, shardResult);
             }
-
-            suggest = hasSuggestions ? new Suggest(Suggest.reduce(groupedSuggestions)) : null;
         }
 
-        // merge addAggregation
+        // merge Aggregation
         InternalAggregations aggregations = null;
-        if (!queryResults.isEmpty()) {
-            if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
-                List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
-                for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
-                    aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
+        if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
+            List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
+            for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
+                aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
+            }
+            ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
+            aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
+            List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
+            if (pipelineAggregators != null) {
+                List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
+                    .map((p) -> (InternalAggregation) p)
+                    .collect(Collectors.toList());
+                for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
+                    InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
+                    newAggs.add(newAgg);
                 }
-                ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
-                aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
+                aggregations = new InternalAggregations(newAggs);
             }
         }
 
         //Collect profile results
         SearchProfileShardResults shardResults = null;
-        if (!queryResults.isEmpty() && firstResult.profileResults() != null) {
+        if (firstResult.profileResults() != null) {
             Map<String, ProfileShardResult> profileResults = new HashMap<>(queryResults.size());
             for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
                 String key = entry.value.queryResult().shardTarget().toString();
@@ -416,24 +520,22 @@ public class SearchPhaseController extends AbstractComponent {
             shardResults = new SearchProfileShardResults(profileResults);
         }
 
-        if (aggregations != null) {
-            List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
-            if (pipelineAggregators != null) {
-                List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false).map((p) -> {
-                    return (InternalAggregation) p;
-                }).collect(Collectors.toList());
-                for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
-                    ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
-                    InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
-                    newAggs.add(newAgg);
-                }
-                aggregations = new InternalAggregations(newAggs);
-            }
-        }
-
         InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
 
         return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
     }
 
+    /**
+     * returns the number of top results to be considered across all shards
+     */
+    private static int topN(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
+        QuerySearchResultProvider firstResult = queryResults.get(0).value;
+        int topN = firstResult.queryResult().size();
+        if (firstResult.includeFetch()) {
+            // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
+            // this is also important since we shortcut and fetch only docs from "from" and up to "size"
+            topN *= queryResults.size();
+        }
+        return topN;
+    }
 }

+ 0 - 3
core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java

@@ -39,10 +39,7 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
     private OriginalIndices originalIndices;
 
     public ShardFetchSearchRequest() {
-    }
 
-    public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list) {
-        this(request, id, list, null);
     }
 
     public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {

+ 3 - 1
core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java

@@ -43,7 +43,9 @@ public final class MatchedQueriesFetchSubPhase implements FetchSubPhase {
 
     @Override
     public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
-        if (hits.length == 0) {
+        if (hits.length == 0 ||
+            // in case the request has only suggest, parsed query is null
+            context.parsedQuery() == null) {
             return;
         }
         hits = hits.clone(); // don't modify the incoming hits

+ 11 - 4
core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java

@@ -415,8 +415,8 @@ public class InternalSearchHit implements SearchHit {
         static final String INNER_HITS = "inner_hits";
     }
 
-    @Override
-    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+    // public because we render hit as part of completion suggestion option
+    public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException {
         List<SearchHitField> metaFields = new ArrayList<>();
         List<SearchHitField> otherFields = new ArrayList<>();
         if (fields != null && !fields.isEmpty()) {
@@ -432,7 +432,6 @@ public class InternalSearchHit implements SearchHit {
             }
         }
 
-        builder.startObject();
         // For inner_hit hits shard is null and that is ok, because the parent search hit has all this information.
         // Even if this was included in the inner_hit hits this would be the same, so better leave it out.
         if (explanation() != null && shard != null) {
@@ -516,7 +515,6 @@ public class InternalSearchHit implements SearchHit {
             }
             builder.endObject();
         }
-        builder.endObject();
         return builder;
     }
 
@@ -533,6 +531,15 @@ public class InternalSearchHit implements SearchHit {
             builder.endArray();
         }
         builder.endObject();
+
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        toInnerXContent(builder, params);
+        builder.endObject();
+        return builder;
     }
 
     public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {

+ 39 - 23
core/src/main/java/org/elasticsearch/search/suggest/Suggest.java

@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Top level suggest result, containing the result for each suggestion.
@@ -48,18 +49,16 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
 
     private static final String NAME = "suggest";
 
-    private static final Comparator<Option> COMPARATOR = new Comparator<Suggest.Suggestion.Entry.Option>() {
-        @Override
-        public int compare(Option first, Option second) {
-            int cmp = Float.compare(second.getScore(), first.getScore());
-            if (cmp != 0) {
-                return cmp;
-            }
-            return first.getText().compareTo(second.getText());
-         }
-    };
+    public static final Comparator<Option> COMPARATOR = (first, second) -> {
+        int cmp = Float.compare(second.getScore(), first.getScore());
+        if (cmp != 0) {
+            return cmp;
+        }
+        return first.getText().compareTo(second.getText());
+     };
 
     private List<Suggestion<? extends Entry<? extends Option>>> suggestions;
+    private boolean hasScoreDocs;
 
     private Map<String, Suggestion<? extends Entry<? extends Option>>> suggestMap;
 
@@ -68,7 +67,12 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
     }
 
     public Suggest(List<Suggestion<? extends Entry<? extends Option>>> suggestions) {
+        // we sort suggestions by their names to ensure iteration over suggestions are consistent
+        // this is needed as we need to fill in suggestion docs in SearchPhaseController#sortDocs
+        // in the same order as we enrich the suggestions with fetch results in SearchPhaseController#merge
+        suggestions.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
         this.suggestions = suggestions;
+        this.hasScoreDocs = filter(CompletionSuggestion.class).stream().anyMatch(CompletionSuggestion::hasScoreDocs);
     }
 
     @Override
@@ -97,6 +101,13 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
         return (T) suggestMap.get(name);
     }
 
+    /**
+     * Whether any suggestions had query hits
+     */
+    public boolean hasScoreDocs() {
+        return hasScoreDocs;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         final int size = in.readVInt();
@@ -125,6 +136,7 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
             suggestion.readFrom(in);
             suggestions.add(suggestion);
         }
+        hasScoreDocs = filter(CompletionSuggestion.class).stream().anyMatch(CompletionSuggestion::hasScoreDocs);
     }
 
     @Override
@@ -160,18 +172,6 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
         return result;
     }
 
-    public static Map<String, List<Suggest.Suggestion>> group(Map<String, List<Suggest.Suggestion>> groupedSuggestions, Suggest suggest) {
-        for (Suggestion<? extends Entry<? extends Option>> suggestion : suggest) {
-            List<Suggestion> list = groupedSuggestions.get(suggestion.getName());
-            if (list == null) {
-                list = new ArrayList<>();
-                groupedSuggestions.put(suggestion.getName(), list);
-            }
-            list.add(suggestion);
-        }
-        return groupedSuggestions;
-    }
-
     public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions) {
         List<Suggestion<? extends Entry<? extends Option>>> reduced = new ArrayList<>(groupedSuggestions.size());
         for (java.util.Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
@@ -193,6 +193,16 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
         return reduced;
     }
 
+    /**
+     * @return only suggestions of type <code>suggestionType</code> contained in this {@link Suggest} instance
+     */
+    public <T extends Suggestion> List<T> filter(Class<T> suggestionType) {
+         return suggestions.stream()
+            .filter(suggestion -> suggestion.getClass() == suggestionType)
+            .map(suggestion -> (T) suggestion)
+            .collect(Collectors.toList());
+    }
+
     /**
      * The suggestion responses corresponding with the suggestions in the request.
      */
@@ -238,6 +248,13 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
             return name;
         }
 
+        /**
+         * @return The number of requested suggestion option size
+         */
+        public int getSize() {
+            return size;
+        }
+
         /**
          * Merges the result of another suggestion into this suggestion.
          * For internal usage.
@@ -331,7 +348,6 @@ public class Suggest implements Iterable<Suggest.Suggestion<? extends Entry<? ex
             return builder;
         }
 
-
         /**
          * Represents a part from the suggest text with suggested options.
          */

+ 1 - 1
core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggester.java

@@ -109,7 +109,7 @@ public class CompletionSuggester extends Suggester<CompletionSuggestionContext>
                     }
                 }
                 if (numResult++ < suggestionContext.getSize()) {
-                    CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option(
+                    CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option(suggestDoc.doc,
                         new Text(suggestDoc.key.toString()), suggestDoc.score, contexts, payload);
                     completionSuggestEntry.addOption(option);
                 } else {

+ 106 - 25
core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java

@@ -18,11 +18,16 @@
  */
 package org.elasticsearch.search.suggest.completion;
 
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.suggest.Lookup;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.text.Text;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.internal.InternalSearchHit;
+import org.elasticsearch.search.internal.InternalSearchHits;
+import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
 import org.elasticsearch.search.suggest.Suggest;
 
 import java.io.IOException;
@@ -35,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.search.suggest.Suggest.COMPARATOR;
+
 /**
  * Suggestion response for {@link CompletionSuggester} results
  *
@@ -62,6 +69,25 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
         super(name, size);
     }
 
+    /**
+     * @return the result options for the suggestion
+     */
+    public List<Entry.Option> getOptions() {
+        if (entries.isEmpty() == false) {
+            assert entries.size() == 1 : "CompletionSuggestion must have only one entry";
+            return entries.get(0).getOptions();
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    /**
+     * @return whether there is any hits for the suggestion
+     */
+    public boolean hasScoreDocs() {
+        return getOptions().size() > 0;
+    }
+
     private static final class OptionPriorityQueue extends org.apache.lucene.util.PriorityQueue<Entry.Option> {
 
         private final Comparator<Suggest.Suggestion.Entry.Option> comparator;
@@ -90,30 +116,54 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
         }
     }
 
-    @Override
-    public Suggest.Suggestion<Entry> reduce(List<Suggest.Suggestion<Entry>> toReduce) {
-        if (toReduce.size() == 1) {
-            return toReduce.get(0);
+    /**
+     * Reduces suggestions to a single suggestion containing at most
+     * top {@link CompletionSuggestion#getSize()} options across <code>toReduce</code>
+     */
+    public static CompletionSuggestion reduceTo(List<Suggest.Suggestion<Entry>> toReduce) {
+        if (toReduce.isEmpty()) {
+            return null;
         } else {
-            // combine suggestion entries from participating shards on the coordinating node
-            // the global top <code>size</code> entries are collected from the shard results
-            // using a priority queue
-            OptionPriorityQueue priorityQueue = new OptionPriorityQueue(size, sortComparator());
-            for (Suggest.Suggestion<Entry> entries : toReduce) {
-                assert entries.getEntries().size() == 1 : "CompletionSuggestion must have only one entry";
-                for (Entry.Option option : entries.getEntries().get(0)) {
-                    if (option == priorityQueue.insertWithOverflow(option)) {
-                        // if the current option has overflown from pq,
-                        // we can assume all of the successive options
-                        // from this shard result will be overflown as well
-                        break;
+            final CompletionSuggestion leader = (CompletionSuggestion) toReduce.get(0);
+            final Entry leaderEntry = leader.getEntries().get(0);
+            final String name = leader.getName();
+            if (toReduce.size() == 1) {
+                return leader;
+            } else {
+                // combine suggestion entries from participating shards on the coordinating node
+                // the global top <code>size</code> entries are collected from the shard results
+                // using a priority queue
+                OptionPriorityQueue priorityQueue = new OptionPriorityQueue(leader.getSize(), COMPARATOR);
+                for (Suggest.Suggestion<Entry> suggestion : toReduce) {
+                    assert suggestion.getName().equals(name) : "name should be identical across all suggestions";
+                    for (Entry.Option option : ((CompletionSuggestion) suggestion).getOptions()) {
+                        if (option == priorityQueue.insertWithOverflow(option)) {
+                            // if the current option has overflown from pq,
+                            // we can assume all of the successive options
+                            // from this shard result will be overflown as well
+                            break;
+                        }
                     }
                 }
+                final CompletionSuggestion suggestion = new CompletionSuggestion(leader.getName(), leader.getSize());
+                final Entry entry = new Entry(leaderEntry.getText(), leaderEntry.getOffset(), leaderEntry.getLength());
+                Collections.addAll(entry.getOptions(), priorityQueue.get());
+                suggestion.addTerm(entry);
+                return suggestion;
+            }
+        }
+    }
+
+    @Override
+    public Suggest.Suggestion<Entry> reduce(List<Suggest.Suggestion<Entry>> toReduce) {
+        return reduceTo(toReduce);
+    }
+
+    public void setShardIndex(int shardIndex) {
+        if (entries.isEmpty() == false) {
+            for (Entry.Option option : getOptions()) {
+                option.setShardIndex(shardIndex);
             }
-            Entry options = this.entries.get(0);
-            options.getOptions().clear();
-            Collections.addAll(options.getOptions(), priorityQueue.get());
-            return this;
         }
     }
 
@@ -145,9 +195,12 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
         public static class Option extends Suggest.Suggestion.Entry.Option {
             private Map<String, Set<CharSequence>> contexts;
             private Map<String, List<Object>> payload;
+            private ScoreDoc doc;
+            private InternalSearchHit hit;
 
-            public Option(Text text, float score, Map<String, Set<CharSequence>> contexts, Map<String, List<Object>> payload) {
+            public Option(int docID, Text text, float score, Map<String, Set<CharSequence>> contexts, Map<String, List<Object>> payload) {
                 super(text, score);
+                this.doc = new ScoreDoc(docID, score);
                 this.payload = payload;
                 this.contexts = contexts;
             }
@@ -171,14 +224,30 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
                 return contexts;
             }
 
-            @Override
-            public void setScore(float score) {
-                super.setScore(score);
+            public ScoreDoc getDoc() {
+                return doc;
+            }
+
+            public InternalSearchHit getHit() {
+                return hit;
+            }
+
+            public void setShardIndex(int shardIndex) {
+                this.doc.shardIndex = shardIndex;
+            }
+
+            public void setHit(InternalSearchHit hit) {
+                this.hit = hit;
             }
 
             @Override
             protected XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
-                super.innerToXContent(builder, params);
+                builder.field("text", getText());
+                if (hit != null) {
+                    hit.toInnerXContent(builder, params);
+                } else {
+                    builder.field("score", getScore());
+                }
                 if (payload.size() > 0) {
                     builder.startObject("payload");
                     for (Map.Entry<String, List<Object>> entry : payload.entrySet()) {
@@ -207,6 +276,11 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
             @Override
             public void readFrom(StreamInput in) throws IOException {
                 super.readFrom(in);
+                this.doc = Lucene.readScoreDoc(in);
+                if (in.readBoolean()) {
+                    this.hit = InternalSearchHit.readSearchHit(in,
+                        InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+                }
                 int payloadSize = in.readInt();
                 this.payload = new LinkedHashMap<>(payloadSize);
                 for (int i = 0; i < payloadSize; i++) {
@@ -234,6 +308,13 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
             @Override
             public void writeTo(StreamOutput out) throws IOException {
                 super.writeTo(out);
+                Lucene.writeScoreDoc(out, doc);
+                if (hit != null) {
+                    out.writeBoolean(true);
+                    hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
+                } else {
+                    out.writeBoolean(false);
+                }
                 out.writeInt(payload.size());
                 for (Map.Entry<String, List<Object>> entry : payload.entrySet()) {
                     out.writeString(entry.getKey());

+ 234 - 0
core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java

@@ -0,0 +1,234 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.controller;
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.text.Text;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.fetch.FetchSearchResult;
+import org.elasticsearch.search.fetch.FetchSearchResultProvider;
+import org.elasticsearch.search.internal.InternalSearchHit;
+import org.elasticsearch.search.internal.InternalSearchHits;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.search.query.QuerySearchResult;
+import org.elasticsearch.search.query.QuerySearchResultProvider;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class SearchPhaseControllerTests extends ESTestCase {
+    private SearchPhaseController searchPhaseController;
+
+    @Before
+    public void setup() {
+        searchPhaseController = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null, null);
+    }
+
+    public void testSort() throws Exception {
+        List<CompletionSuggestion> suggestions = new ArrayList<>();
+        for (int i = 0; i < randomIntBetween(1, 5); i++) {
+            suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+        }
+        int nShards = randomIntBetween(1, 20);
+        int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
+        AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize);
+        ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results);
+        int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
+        for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
+            int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
+            accumulatedLength += suggestionSize;
+        }
+        assertThat(sortedDocs.length, equalTo(accumulatedLength));
+    }
+
+    public void testMerge() throws IOException {
+        List<CompletionSuggestion> suggestions = new ArrayList<>();
+        for (int i = 0; i < randomIntBetween(1, 5); i++) {
+            suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
+        }
+        int nShards = randomIntBetween(1, 20);
+        int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
+        AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize);
+
+        // calculate offsets and score doc array
+        List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
+        ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults);
+        mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs));
+        Suggest mergedSuggest = reducedSuggest(queryResults);
+        for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+            if (suggestion instanceof CompletionSuggestion) {
+                CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion);
+                mergedScoreDocs.addAll(completionSuggestion.getOptions().stream()
+                    .map(CompletionSuggestion.Entry.Option::getDoc)
+                    .collect(Collectors.toList()));
+            }
+        }
+        ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
+
+        InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, queryResults,
+            generateFetchResults(nShards, mergedSearchDocs, mergedSuggest));
+        assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
+        Suggest suggestResult = mergedResponse.suggest();
+        for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+            assertThat(suggestion, instanceOf(CompletionSuggestion.class));
+            if (suggestion.getEntries().get(0).getOptions().size() > 0) {
+                CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
+                assertNotNull(suggestionResult);
+                List<CompletionSuggestion.Entry.Option> options = suggestionResult.getEntries().get(0).getOptions();
+                assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
+                for (CompletionSuggestion.Entry.Option option : options) {
+                    assertNotNull(option.getHit());
+                }
+            }
+        }
+    }
+
+    private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
+                                                                        List<CompletionSuggestion> suggestions,
+                                                                        int searchHitsSize) {
+        AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards);
+        for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
+            QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
+                new SearchShardTarget("", new Index("", ""), shardIndex));
+            TopDocs topDocs = new TopDocs(0, new ScoreDoc[0], 0);
+            if (searchHitsSize > 0) {
+                int nDocs = randomIntBetween(0, searchHitsSize);
+                ScoreDoc[] scoreDocs = new ScoreDoc[nDocs];
+                float maxScore = 0F;
+                for (int i = 0; i < nDocs; i++) {
+                    float score = Math.abs(randomFloat());
+                    scoreDocs[i] = new ScoreDoc(i, score);
+                    if (score > maxScore) {
+                        maxScore = score;
+                    }
+                }
+                topDocs = new TopDocs(scoreDocs.length, scoreDocs, maxScore);
+            }
+            List<CompletionSuggestion> shardSuggestion = new ArrayList<>();
+            for (CompletionSuggestion completionSuggestion : suggestions) {
+                CompletionSuggestion suggestion = new CompletionSuggestion(
+                    completionSuggestion.getName(), completionSuggestion.getSize());
+                final CompletionSuggestion.Entry completionEntry = new CompletionSuggestion.Entry(new Text(""), 0, 5);
+                suggestion.addTerm(completionEntry);
+                int optionSize = randomIntBetween(1, suggestion.getSize());
+                float maxScore = randomIntBetween(suggestion.getSize(), (int) Float.MAX_VALUE);
+                for (int i = 0; i < optionSize; i++) {
+                    completionEntry.addOption(new CompletionSuggestion.Entry.Option(i, new Text(""), maxScore,
+                        Collections.emptyMap(), Collections.emptyMap()));
+                    float dec = randomIntBetween(0, optionSize);
+                    if (dec <= maxScore) {
+                        maxScore -= dec;
+                    }
+                }
+                suggestion.setShardIndex(shardIndex);
+                shardSuggestion.add(suggestion);
+            }
+            querySearchResult.topDocs(topDocs, null);
+            querySearchResult.size(searchHitsSize);
+            querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion)));
+            queryResults.set(shardIndex, querySearchResult);
+        }
+        return queryResults;
+    }
+
+    private int getTotalQueryHits(AtomicArray<QuerySearchResultProvider> results) {
+        int resultCount = 0;
+        for (AtomicArray.Entry<QuerySearchResultProvider> shardResult : results.asList()) {
+            resultCount += shardResult.value.queryResult().topDocs().totalHits;
+        }
+        return resultCount;
+    }
+
+    private Suggest reducedSuggest(AtomicArray<QuerySearchResultProvider> results) {
+        Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>();
+        for (AtomicArray.Entry<QuerySearchResultProvider> entry : results.asList()) {
+            for (Suggest.Suggestion<?> suggestion : entry.value.queryResult().suggest()) {
+                List<Suggest.Suggestion<CompletionSuggestion.Entry>> suggests =
+                    groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
+                suggests.add((Suggest.Suggestion<CompletionSuggestion.Entry>) suggestion);
+            }
+        }
+        return new Suggest(groupedSuggestion.values().stream().map(CompletionSuggestion::reduceTo)
+            .collect(Collectors.toList()));
+    }
+
+    private ScoreDoc[] getTopShardDocs(AtomicArray<QuerySearchResultProvider> results) throws IOException {
+        List<AtomicArray.Entry<QuerySearchResultProvider>> resultList = results.asList();
+        TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
+        for (int i = 0; i < resultList.size(); i++) {
+            shardTopDocs[i] = resultList.get(i).value.queryResult().topDocs();
+        }
+        int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
+        return TopDocs.merge(topN, shardTopDocs).scoreDocs;
+    }
+
+    private AtomicArray<FetchSearchResultProvider> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
+        AtomicArray<FetchSearchResultProvider> fetchResults = new AtomicArray<>(nShards);
+        for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
+            float maxScore = -1F;
+            SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex);
+            FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
+            List<InternalSearchHit> internalSearchHits = new ArrayList<>();
+            for (ScoreDoc scoreDoc : mergedSearchDocs) {
+                if (scoreDoc.shardIndex == shardIndex) {
+                    internalSearchHits.add(new InternalSearchHit(scoreDoc.doc, "", new Text(""), Collections.emptyMap()));
+                    if (scoreDoc.score > maxScore) {
+                        maxScore = scoreDoc.score;
+                    }
+                }
+            }
+            for (Suggest.Suggestion<?> suggestion : mergedSuggest) {
+                if (suggestion instanceof CompletionSuggestion) {
+                    for (CompletionSuggestion.Entry.Option option : ((CompletionSuggestion) suggestion).getOptions()) {
+                        ScoreDoc doc = option.getDoc();
+                        if (doc.shardIndex == shardIndex) {
+                            internalSearchHits.add(new InternalSearchHit(doc.doc, "", new Text(""), Collections.emptyMap()));
+                            if (doc.score > maxScore) {
+                                maxScore = doc.score;
+                            }
+                        }
+                    }
+                }
+            }
+            InternalSearchHit[] hits = internalSearchHits.toArray(new InternalSearchHit[internalSearchHits.size()]);
+            fetchSearchResult.hits(new InternalSearchHits(hits, hits.length, maxScore));
+            fetchResults.set(shardIndex, fetchSearchResult);
+        }
+        return fetchResults;
+    }
+}

+ 113 - 0
core/src/test/java/org/elasticsearch/search/suggest/CompletionSuggestSearchIT.java

@@ -63,6 +63,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -72,6 +73,9 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHit;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasScore;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -80,6 +84,7 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 
 @SuppressCodecs("*") // requires custom completion format
@@ -391,6 +396,114 @@ public class CompletionSuggestSearchIT extends ESIntegTestCase {
         }
     }
 
+    public void testSuggestDocument() throws Exception {
+        final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+        createIndexAndMapping(mapping);
+        int numDocs = randomIntBetween(10, 100);
+        List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+        for (int i = 1; i <= numDocs; i++) {
+            indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+                .setSource(jsonBuilder()
+                    .startObject()
+                    .startObject(FIELD)
+                    .field("input", "suggestion" + i)
+                    .field("weight", i)
+                    .endObject()
+                    .endObject()
+                ));
+        }
+        indexRandom(true, indexRequestBuilders);
+        CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+        SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(new SuggestBuilder().addSuggestion("foo", prefix)).get();
+        CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+        CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+        assertThat(options.getOptions().size(), equalTo(numDocs));
+        int id = numDocs;
+        for (CompletionSuggestion.Entry.Option option : options) {
+            assertThat(option.getText().toString(), equalTo("suggestion" + id));
+            assertSearchHit(option.getHit(), hasId("" + id));
+            assertSearchHit(option.getHit(), hasScore(((float) id)));
+            assertNotNull(option.getHit().source());
+            id--;
+        }
+    }
+
+    public void testSuggestDocumentNoSource() throws Exception {
+        final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+        createIndexAndMapping(mapping);
+        int numDocs = randomIntBetween(10, 100);
+        List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+        for (int i = 1; i <= numDocs; i++) {
+            indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+                .setSource(jsonBuilder()
+                    .startObject()
+                    .startObject(FIELD)
+                    .field("input", "suggestion" + i)
+                    .field("weight", i)
+                    .endObject()
+                    .endObject()
+                ));
+        }
+        indexRandom(true, indexRequestBuilders);
+        CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+        SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(
+            new SuggestBuilder().addSuggestion("foo", prefix)
+        ).setFetchSource(false).get();
+        CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+        CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+        assertThat(options.getOptions().size(), equalTo(numDocs));
+        int id = numDocs;
+        for (CompletionSuggestion.Entry.Option option : options) {
+            assertThat(option.getText().toString(), equalTo("suggestion" + id));
+            assertSearchHit(option.getHit(), hasId("" + id));
+            assertSearchHit(option.getHit(), hasScore(((float) id)));
+            assertNull(option.getHit().source());
+            id--;
+        }
+    }
+
+    public void testSuggestDocumentSourceFiltering() throws Exception {
+        final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
+        createIndexAndMapping(mapping);
+        int numDocs = randomIntBetween(10, 100);
+        List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
+        for (int i = 1; i <= numDocs; i++) {
+            indexRequestBuilders.add(client().prepareIndex(INDEX, TYPE, "" + i)
+                .setSource(jsonBuilder()
+                    .startObject()
+                    .startObject(FIELD)
+                    .field("input", "suggestion" + i)
+                    .field("weight", i)
+                    .endObject()
+                    .field("a", "include")
+                    .field("b", "exclude")
+                    .endObject()
+                ));
+        }
+        indexRandom(true, indexRequestBuilders);
+        CompletionSuggestionBuilder prefix = SuggestBuilders.completionSuggestion(FIELD).prefix("sugg").size(numDocs);
+
+        SearchResponse searchResponse = client().prepareSearch(INDEX).suggest(
+            new SuggestBuilder().addSuggestion("foo", prefix)
+        ).setFetchSource("a", "b").get();
+        CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion("foo");
+        CompletionSuggestion.Entry options = completionSuggestion.getEntries().get(0);
+        assertThat(options.getOptions().size(), equalTo(numDocs));
+        int id = numDocs;
+        for (CompletionSuggestion.Entry.Option option : options) {
+            assertThat(option.getText().toString(), equalTo("suggestion" + id));
+            assertSearchHit(option.getHit(), hasId("" + id));
+            assertSearchHit(option.getHit(), hasScore(((float) id)));
+            assertNotNull(option.getHit().source());
+            Set<String> sourceFields = option.getHit().sourceAsMap().keySet();
+            assertThat(sourceFields, contains("a"));
+            assertThat(sourceFields, not(contains("b")));
+            id--;
+        }
+    }
+
     public void testThatWeightsAreWorking() throws Exception {
         createIndexAndMapping(completionMappingBuilder);
 

+ 73 - 0
core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java

@@ -0,0 +1,73 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.suggest;
+
+import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
+import org.elasticsearch.search.suggest.phrase.PhraseSuggestion;
+import org.elasticsearch.search.suggest.term.TermSuggestion;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class SuggestTests extends ESTestCase {
+
+    public void testFilter() throws Exception {
+        List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> suggestions;
+        CompletionSuggestion completionSuggestion = new CompletionSuggestion(randomAsciiOfLength(10), 2);
+        PhraseSuggestion phraseSuggestion = new PhraseSuggestion(randomAsciiOfLength(10), 2);
+        TermSuggestion termSuggestion = new TermSuggestion(randomAsciiOfLength(10), 2, SortBy.SCORE);
+        suggestions = Arrays.asList(completionSuggestion, phraseSuggestion, termSuggestion);
+        Suggest suggest = new Suggest(suggestions);
+        List<PhraseSuggestion> phraseSuggestions = suggest.filter(PhraseSuggestion.class);
+        assertThat(phraseSuggestions.size(), equalTo(1));
+        assertThat(phraseSuggestions.get(0), equalTo(phraseSuggestion));
+        List<TermSuggestion> termSuggestions = suggest.filter(TermSuggestion.class);
+        assertThat(termSuggestions.size(), equalTo(1));
+        assertThat(termSuggestions.get(0), equalTo(termSuggestion));
+        List<CompletionSuggestion> completionSuggestions = suggest.filter(CompletionSuggestion.class);
+        assertThat(completionSuggestions.size(), equalTo(1));
+        assertThat(completionSuggestions.get(0), equalTo(completionSuggestion));
+    }
+
+    public void testSuggestionOrdering() throws Exception {
+        List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> suggestions;
+        suggestions = new ArrayList<>();
+        int n = randomIntBetween(2, 5);
+        for (int i = 0; i < n; i++) {
+            suggestions.add(new CompletionSuggestion(randomAsciiOfLength(10), randomIntBetween(3, 5)));
+        }
+        Collections.shuffle(suggestions, random());
+        Suggest suggest = new Suggest(suggestions);
+        List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> sortedSuggestions;
+        sortedSuggestions = new ArrayList<>(suggestions);
+        sortedSuggestions.sort((o1, o2) -> o1.getName().compareTo(o2.getName()));
+        List<CompletionSuggestion> completionSuggestions = suggest.filter(CompletionSuggestion.class);
+        assertThat(completionSuggestions.size(), equalTo(n));
+        for (int i = 0; i < n; i++) {
+            assertThat(completionSuggestions.get(i).getName(), equalTo(sortedSuggestions.get(i).getName()));
+        }
+    }
+
+}

+ 61 - 0
core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.suggest.completion;
+
+import org.elasticsearch.common.text.Text;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+public class CompletionSuggestionTests extends ESTestCase {
+
+    public void testToReduce() throws Exception {
+        List<Suggest.Suggestion<CompletionSuggestion.Entry>> shardSuggestions = new ArrayList<>();
+        int nShards = randomIntBetween(1, 10);
+        String name = randomAsciiOfLength(10);
+        int size = randomIntBetween(3, 5);
+        for (int i = 0; i < nShards; i++) {
+            CompletionSuggestion suggestion = new CompletionSuggestion(name, size);
+            suggestion.addTerm(new CompletionSuggestion.Entry(new Text(""), 0, 0));
+            shardSuggestions.add(suggestion);
+        }
+        int totalResults = randomIntBetween(0, 5) * nShards;
+        float maxScore = randomIntBetween(totalResults, totalResults*2);
+        for (int i = 0; i < totalResults; i++) {
+            Suggest.Suggestion<CompletionSuggestion.Entry> suggestion = randomFrom(shardSuggestions);
+            suggestion.getEntries().get(0).addOption(new CompletionSuggestion.Entry.Option(i, new Text(""),
+                maxScore - i, Collections.emptyMap(), Collections.emptyMap()));
+        }
+        CompletionSuggestion reducedSuggestion = CompletionSuggestion.reduceTo(shardSuggestions);
+        assertNotNull(reducedSuggestion);
+        assertThat(reducedSuggestion.getOptions().size(), lessThanOrEqualTo(size));
+        int count = 0;
+        for (CompletionSuggestion.Entry.Option option : reducedSuggestion.getOptions()) {
+            assertThat(option.getDoc().doc, equalTo(count));
+            count++;
+        }
+    }
+}

+ 20 - 5
docs/reference/search/suggesters/completion-suggest.asciidoc

@@ -181,15 +181,23 @@ returns this response:
     "length" : 3,
     "options" : [ {
       "text" : "Nirvana",
-      "score" : 1.0
+      "_index": "music",
+      "_type": "song",
+      "_id": "1",
+      "_score": 1.0,
+      "_source": {
+        "suggest": ["Nevermind", "Nirvana"]
+      }
     } ]
   } ]
 }
 --------------------------------------------------
 // TESTRESPONSE
 
-The configured weight for a suggestion is returned as `score`.
-The `text` field uses the `input` of your indexed suggestion.
+The configured weight for a suggestion is returned as `_score`.
+The `text` field uses the `input` of your indexed suggestion. The document
+source is returned in `_source`. <<search-request-source-filtering, source filtering>>
+parameters are supported for filtering the document source.
 
 Suggestions are document oriented, you can specify fields to be
 returned as part of suggestion payload. All field types (`string`,
@@ -200,7 +208,7 @@ as follows:
 
 [source,js]
 --------------------------------------------------
-POST music/song?refresh
+PUT music/song/2?refresh
 {
     "suggest" : "Nirvana",
     "title" : "Nevermind"
@@ -243,7 +251,14 @@ returns:
     "length" : 1,
     "options" : [ {
       "text" : "Nirvana",
-      "score" : 1.0,
+      "_index": "music",
+      "_type": "song",
+      "_id": "2",
+      "_score" : 1.0,
+      "_source": {
+        "title": "Nevermind",
+        "suggest": "Nirvana"
+      },
       "payload" : {
         "title" : [ "Nevermind" ]
       }