Ver código fonte

Improved regular scroll api by using IndexSearch#searchAfter instead of regular search methods which rely on `from` for pagination.

This prevents the creation of priority queues of `from + size`, instead the size of the priority queue will always be equal to `size`.

Closes #4940
Martijn van Groningen 11 anos atrás
pai
commit
947c5f6920
33 arquivos alterados com 1035 adições e 157 exclusões
  1. 3 0
      src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java
  2. 1 1
      src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java
  3. 9 5
      src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java
  4. 2 2
      src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java
  5. 1 1
      src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java
  6. 9 6
      src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java
  7. 15 3
      src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java
  8. 40 8
      src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java
  9. 25 1
      src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java
  10. 90 72
      src/main/java/org/elasticsearch/common/lucene/Lucene.java
  11. 30 3
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/BytesRefOrdValComparator.java
  12. 4 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/BytesRefValComparator.java
  13. 5 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/DoubleScriptDataComparator.java
  14. 5 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/DoubleValuesComparatorBase.java
  15. 5 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/GeoDistanceComparator.java
  16. 5 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/LongValuesComparatorBase.java
  17. 7 0
      src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/NestedWrappableComparator.java
  18. 106 8
      src/main/java/org/elasticsearch/index/search/nested/NestedFieldComparatorSource.java
  19. 21 4
      src/main/java/org/elasticsearch/percolator/PercolateContext.java
  20. 33 20
      src/main/java/org/elasticsearch/search/SearchService.java
  21. 103 3
      src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
  22. 37 0
      src/main/java/org/elasticsearch/search/fetch/FetchSearchRequest.java
  23. 24 0
      src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java
  24. 6 2
      src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
  25. 8 0
      src/main/java/org/elasticsearch/search/internal/SearchContext.java
  26. 25 2
      src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java
  27. 38 7
      src/main/java/org/elasticsearch/search/query/QueryPhase.java
  28. 15 0
      src/main/java/org/elasticsearch/search/sort/SortBuilder.java
  29. 20 0
      src/test/java/org/elasticsearch/index/search/child/TestSearchContext.java
  30. 18 9
      src/test/java/org/elasticsearch/nested/SimpleNestedTests.java
  31. 229 0
      src/test/java/org/elasticsearch/search/scroll/DuelScrollTests.java
  32. 48 0
      src/test/java/org/elasticsearch/search/scroll/SlowDuelScrollTests.java
  33. 48 0
      src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java

+ 3 - 0
src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.search.type;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.collect.Tuple;
 
 import java.util.Map;
@@ -34,6 +35,8 @@ public class ParsedScrollId {
 
     public static final String SCAN = "scan";
 
+    public static final Version SCROLL_SEARCH_AFTER_MINIMUM_VERSION = Version.V_1_2_0;
+
     private final String source;
 
     private final String type;

+ 1 - 1
src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java

@@ -179,7 +179,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
         }
 
         void innerFinishHim() throws Exception {
-            sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
+            sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryFetchResults);
             final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
             String scrollId = null;
             if (request.scroll() != null) {

+ 9 - 5
src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.search.type;
 
 import com.carrotsearch.hppc.IntArrayList;
+import org.apache.lucene.search.ScoreDoc;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.ReduceSearchPhaseException;
 import org.elasticsearch.action.search.SearchOperationThreading;
@@ -181,8 +182,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
             }
         }
 
-        void innerExecuteFetchPhase() {
-            sortedShardList = searchPhaseController.sortDocs(queryResults);
+        void innerExecuteFetchPhase() throws Exception {
+            sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryResults);
             searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
 
             if (docIdsToLoad.asList().isEmpty()) {
@@ -190,6 +191,9 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
                 return;
             }
 
+            final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
+                    request, sortedShardList, firstResults.length()
+            );
             final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
             int localOperations = 0;
             for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
@@ -198,7 +202,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
                 if (node.id().equals(nodes.localNodeId())) {
                     localOperations++;
                 } else {
-                    FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                    FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                     executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
                 }
             }
@@ -212,7 +216,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
                                 QuerySearchResult queryResult = queryResults.get(entry.index);
                                 DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
                                 if (node.id().equals(nodes.localNodeId())) {
-                                    FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                                    FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                                     executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
                                 }
                             }
@@ -224,7 +228,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
                         final QuerySearchResult queryResult = queryResults.get(entry.index);
                         final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
                         if (node.id().equals(nodes.localNodeId())) {
-                            final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                            final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                             try {
                                 if (localAsync) {
                                     threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {

+ 2 - 2
src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java

@@ -47,8 +47,8 @@ import java.util.Map;
  */
 public abstract class TransportSearchHelper {
 
-    public static ShardSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
-        ShardSearchRequest shardRequest = new ShardSearchRequest(request, shardRouting, numberOfShards);
+    public static ShardSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis, boolean useSlowScroll) {
+        ShardSearchRequest shardRequest = new ShardSearchRequest(request, shardRouting, numberOfShards, useSlowScroll);
         shardRequest.filteringAliases(filteringAliases);
         shardRequest.nowInMillis(nowInMillis);
         return shardRequest;

+ 1 - 1
src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java

@@ -85,7 +85,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
         }
 
         private void innerFinishHim() throws IOException {
-            sortedShardList = searchPhaseController.sortDocs(firstResults);
+            sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
             final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
             String scrollId = null;
             if (request.scroll() != null) {

+ 9 - 6
src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.search.type;
 
 import com.carrotsearch.hppc.IntArrayList;
+import org.apache.lucene.search.ScoreDoc;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.ReduceSearchPhaseException;
 import org.elasticsearch.action.search.SearchOperationThreading;
@@ -81,8 +82,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
         }
 
         @Override
-        protected void moveToSecondPhase() {
-            sortedShardList = searchPhaseController.sortDocs(firstResults);
+        protected void moveToSecondPhase() throws Exception {
+            sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
             searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
 
             if (docIdsToLoad.asList().isEmpty()) {
@@ -90,8 +91,10 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
                 return;
             }
 
+            final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
+                    request, sortedShardList, firstResults.length()
+            );
             final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
-
             int localOperations = 0;
             for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
                 QuerySearchResult queryResult = firstResults.get(entry.index);
@@ -99,7 +102,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
                 if (node.id().equals(nodes.localNodeId())) {
                     localOperations++;
                 } else {
-                    FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                    FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                     executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
                 }
             }
@@ -113,7 +116,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
                                 QuerySearchResult queryResult = firstResults.get(entry.index);
                                 DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
                                 if (node.id().equals(nodes.localNodeId())) {
-                                    FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                                    FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                                     executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
                                 }
                             }
@@ -125,7 +128,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
                         final QuerySearchResult queryResult = firstResults.get(entry.index);
                         final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
                         if (node.id().equals(nodes.localNodeId())) {
-                            final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, queryResult.id(), entry.value);
+                            final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
                             try {
                                 if (localAsync) {
                                     threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {

+ 15 - 3
src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.search.type;
 
 import org.apache.lucene.search.ScoreDoc;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.*;
 import org.elasticsearch.cluster.ClusterService;
@@ -34,6 +35,7 @@ import org.elasticsearch.search.action.SearchServiceListener;
 import org.elasticsearch.search.action.SearchServiceTransportAction;
 import org.elasticsearch.search.controller.SearchPhaseController;
 import org.elasticsearch.search.fetch.QueryFetchSearchResult;
+import org.elasticsearch.search.internal.InternalScrollSearchRequest;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -72,6 +74,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
     private class AsyncAction {
 
         private final SearchScrollRequest request;
+        private volatile boolean useSlowScroll;
 
         private final ActionListener<SearchResponse> listener;
 
@@ -131,6 +134,9 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
                 Tuple<String, Long> target = context[i];
                 DiscoveryNode node = nodes.get(target.v1());
                 if (node != null) {
+                    if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+                        useSlowScroll = true;
+                    }
                     if (nodes.localNodeId().equals(node.id())) {
                         localOperations++;
                     } else {
@@ -205,7 +211,8 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
         }
 
         void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
-            searchService.sendExecuteFetch(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QueryFetchSearchResult>() {
+            InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
+            searchService.sendExecuteFetch(node, internalRequest, new SearchServiceListener<QueryFetchSearchResult>() {
                 @Override
                 public void onResult(QueryFetchSearchResult result) {
                     queryFetchResults.set(shardIndex, result);
@@ -240,8 +247,13 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
             }
         }
 
-        private void innerFinishHim() {
-            ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
+        private void innerFinishHim() throws Exception {
+            ScoreDoc[] sortedShardList;
+            if (useSlowScroll) {
+                sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
+            } else {
+                sortedShardList = searchPhaseController.sortDocsForScroll(queryFetchResults);
+            }
             final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
             String scrollId = null;
             if (request.scroll() != null) {

+ 40 - 8
src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java

@@ -36,6 +36,7 @@ import org.elasticsearch.search.action.SearchServiceTransportAction;
 import org.elasticsearch.search.controller.SearchPhaseController;
 import org.elasticsearch.search.fetch.FetchSearchRequest;
 import org.elasticsearch.search.fetch.FetchSearchResult;
+import org.elasticsearch.search.internal.InternalScrollSearchRequest;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -92,6 +93,8 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
 
         private final long startTime = System.currentTimeMillis();
 
+        private volatile boolean useSlowScroll;
+
         private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
             this.request = request;
             this.listener = listener;
@@ -137,6 +140,9 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
                 Tuple<String, Long> target = context[i];
                 DiscoveryNode node = nodes.get(target.v1());
                 if (node != null) {
+                    if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+                        useSlowScroll = true;
+                    }
                     if (nodes.localNodeId().equals(node.id())) {
                         localOperations++;
                     } else {
@@ -148,7 +154,12 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
                     }
                     successfulOps.decrementAndGet();
                     if (counter.decrementAndGet() == 0) {
-                        executeFetchPhase();
+                        try {
+                            executeFetchPhase();
+                        } catch (Throwable e) {
+                            listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, null));
+                            return;
+                        }
                     }
                 }
             }
@@ -197,12 +208,17 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
         }
 
         private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
-            searchService.sendExecuteQuery(node, internalScrollSearchRequest(searchId, request), new SearchServiceListener<QuerySearchResult>() {
+            InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
+            searchService.sendExecuteQuery(node, internalRequest, new SearchServiceListener<QuerySearchResult>() {
                 @Override
                 public void onResult(QuerySearchResult result) {
                     queryResults.set(shardIndex, result);
                     if (counter.decrementAndGet() == 0) {
-                        executeFetchPhase();
+                        try {
+                            executeFetchPhase();
+                        } catch (Throwable e) {
+                            onFailure(e);
+                        }
                     }
                 }
 
@@ -220,25 +236,41 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
             addShardFailure(shardIndex, new ShardSearchFailure(t));
             successfulOps.decrementAndGet();
             if (counter.decrementAndGet() == 0) {
-                executeFetchPhase();
+                try {
+                    executeFetchPhase();
+                } catch (Throwable e) {
+                    listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, null));
+                }
             }
         }
 
-        private void executeFetchPhase() {
-            sortedShardList = searchPhaseController.sortDocs(queryResults);
+        private void executeFetchPhase() throws Exception {
+            if (useSlowScroll) {
+                sortedShardList = searchPhaseController.sortDocs(queryResults);
+            } else {
+                sortedShardList = searchPhaseController.sortDocsForScroll(queryResults);
+            }
             AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<IntArrayList>(queryResults.length());
             searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
 
             if (docIdsToLoad.asList().isEmpty()) {
                 finishHim();
+                return;
             }
 
-            final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
 
+            final ScoreDoc[] lastEmittedDocPerShard;
+            if (useSlowScroll) {
+                lastEmittedDocPerShard = new ScoreDoc[queryResults.length()];
+            } else {
+                lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
+            }
+            final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
             for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
                 IntArrayList docIds = entry.value;
                 final QuerySearchResult querySearchResult = queryResults.get(entry.index);
-                FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds);
+                ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
+                FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
                 DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
                 searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
                     @Override

+ 25 - 1
src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java

@@ -42,8 +42,10 @@ import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.action.SearchServiceListener;
 import org.elasticsearch.search.action.SearchServiceTransportAction;
 import org.elasticsearch.search.controller.SearchPhaseController;
+import org.elasticsearch.search.fetch.FetchSearchRequest;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.search.query.QuerySearchResultProvider;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -95,6 +97,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
         private final Object shardFailuresMutex = new Object();
         protected volatile ScoreDoc[] sortedShardList;
 
+        protected final boolean useSlowScroll;
         protected final long startTime = System.currentTimeMillis();
 
         protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
@@ -120,6 +123,18 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
             expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
 
             firstResults = new AtomicArray<FirstResult>(shardsIts.size());
+            // Not so nice, but we need to know if there're nodes below the supported version
+            // and if so fall back to classic scroll (based on from). We need to check every node
+            // because we don't to what nodes we end up sending the request (shard may fail or relocate)
+            boolean useSlowScroll = false;
+            if (request.scroll() != null) {
+                for (DiscoveryNode discoveryNode : clusterState.nodes()) {
+                    if (discoveryNode.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+                        useSlowScroll = true;
+                    }
+                }
+            }
+            this.useSlowScroll = useSlowScroll;
         }
 
         public void start() {
@@ -213,7 +228,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
                     onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
                 } else {
                     String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
-                    sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
+                    sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime, useSlowScroll), new SearchServiceListener<FirstResult>() {
                         @Override
                         public void onResult(FirstResult result) {
                             onFirstPhaseResult(shardIndex, shard, result, shardIt);
@@ -381,6 +396,15 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
             }
         }
 
+        protected FetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) {
+            if (lastEmittedDocPerShard != null) {
+                ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
+                return new FetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
+            } else {
+                return new FetchSearchRequest(request, queryResult.id(), entry.value);
+            }
+        }
+
         protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<FirstResult> listener);
 
         protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {

+ 90 - 72
src/main/java/org/elasticsearch/common/lucene/Lucene.java

@@ -29,6 +29,7 @@ import org.apache.lucene.search.*;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.Version;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -163,34 +164,7 @@ public class Lucene {
 
             FieldDoc[] fieldDocs = new FieldDoc[in.readVInt()];
             for (int i = 0; i < fieldDocs.length; i++) {
-                Comparable[] cFields = new Comparable[in.readVInt()];
-                for (int j = 0; j < cFields.length; j++) {
-                    byte type = in.readByte();
-                    if (type == 0) {
-                        cFields[j] = null;
-                    } else if (type == 1) {
-                        cFields[j] = in.readString();
-                    } else if (type == 2) {
-                        cFields[j] = in.readInt();
-                    } else if (type == 3) {
-                        cFields[j] = in.readLong();
-                    } else if (type == 4) {
-                        cFields[j] = in.readFloat();
-                    } else if (type == 5) {
-                        cFields[j] = in.readDouble();
-                    } else if (type == 6) {
-                        cFields[j] = in.readByte();
-                    } else if (type == 7) {
-                        cFields[j] = in.readShort();
-                    } else if (type == 8) {
-                        cFields[j] = in.readBoolean();
-                    } else if (type == 9) {
-                        cFields[j] = in.readBytesRef();
-                    } else {
-                        throw new IOException("Can't match type [" + type + "]");
-                    }
-                }
-                fieldDocs[i] = new FieldDoc(in.readVInt(), in.readFloat(), cFields);
+                fieldDocs[i] = readFieldDoc(in);
             }
             return new TopFieldDocs(totalHits, fieldDocs, fields, maxScore);
         } else {
@@ -205,6 +179,41 @@ public class Lucene {
         }
     }
 
+    public static FieldDoc readFieldDoc(StreamInput in) throws IOException {
+        Comparable[] cFields = new Comparable[in.readVInt()];
+        for (int j = 0; j < cFields.length; j++) {
+            byte type = in.readByte();
+            if (type == 0) {
+                cFields[j] = null;
+            } else if (type == 1) {
+                cFields[j] = in.readString();
+            } else if (type == 2) {
+                cFields[j] = in.readInt();
+            } else if (type == 3) {
+                cFields[j] = in.readLong();
+            } else if (type == 4) {
+                cFields[j] = in.readFloat();
+            } else if (type == 5) {
+                cFields[j] = in.readDouble();
+            } else if (type == 6) {
+                cFields[j] = in.readByte();
+            } else if (type == 7) {
+                cFields[j] = in.readShort();
+            } else if (type == 8) {
+                cFields[j] = in.readBoolean();
+            } else if (type == 9) {
+                cFields[j] = in.readBytesRef();
+            } else {
+                throw new IOException("Can't match type [" + type + "]");
+            }
+        }
+        return new FieldDoc(in.readVInt(), in.readFloat(), cFields);
+    }
+
+    public static ScoreDoc readScoreDoc(StreamInput in) throws IOException {
+        return new ScoreDoc(in.readVInt(), in.readFloat());
+    }
+
     public static void writeTopDocs(StreamOutput out, TopDocs topDocs, int from) throws IOException {
         if (topDocs.scoreDocs.length - from < 0) {
             out.writeBoolean(false);
@@ -240,48 +249,7 @@ public class Lucene {
                 if (index++ < from) {
                     continue;
                 }
-                FieldDoc fieldDoc = (FieldDoc) doc;
-                out.writeVInt(fieldDoc.fields.length);
-                for (Object field : fieldDoc.fields) {
-                    if (field == null) {
-                        out.writeByte((byte) 0);
-                    } else {
-                        Class type = field.getClass();
-                        if (type == String.class) {
-                            out.writeByte((byte) 1);
-                            out.writeString((String) field);
-                        } else if (type == Integer.class) {
-                            out.writeByte((byte) 2);
-                            out.writeInt((Integer) field);
-                        } else if (type == Long.class) {
-                            out.writeByte((byte) 3);
-                            out.writeLong((Long) field);
-                        } else if (type == Float.class) {
-                            out.writeByte((byte) 4);
-                            out.writeFloat((Float) field);
-                        } else if (type == Double.class) {
-                            out.writeByte((byte) 5);
-                            out.writeDouble((Double) field);
-                        } else if (type == Byte.class) {
-                            out.writeByte((byte) 6);
-                            out.writeByte((Byte) field);
-                        } else if (type == Short.class) {
-                            out.writeByte((byte) 7);
-                            out.writeShort((Short) field);
-                        } else if (type == Boolean.class) {
-                            out.writeByte((byte) 8);
-                            out.writeBoolean((Boolean) field);
-                        } else if (type == BytesRef.class) {
-                            out.writeByte((byte) 9);
-                            out.writeBytesRef((BytesRef) field);
-                        } else {
-                            throw new IOException("Can't handle sort field value of type [" + type + "]");
-                        }
-                    }
-                }
-
-                out.writeVInt(doc.doc);
-                out.writeFloat(doc.score);
+                writeFieldDoc(out, (FieldDoc) doc);
             }
         } else {
             out.writeBoolean(false);
@@ -294,12 +262,62 @@ public class Lucene {
                 if (index++ < from) {
                     continue;
                 }
-                out.writeVInt(doc.doc);
-                out.writeFloat(doc.score);
+                writeScoreDoc(out, doc);
             }
         }
     }
 
+    public static void writeFieldDoc(StreamOutput out, FieldDoc fieldDoc) throws IOException {
+        out.writeVInt(fieldDoc.fields.length);
+        for (Object field : fieldDoc.fields) {
+            if (field == null) {
+                out.writeByte((byte) 0);
+            } else {
+                Class type = field.getClass();
+                if (type == String.class) {
+                    out.writeByte((byte) 1);
+                    out.writeString((String) field);
+                } else if (type == Integer.class) {
+                    out.writeByte((byte) 2);
+                    out.writeInt((Integer) field);
+                } else if (type == Long.class) {
+                    out.writeByte((byte) 3);
+                    out.writeLong((Long) field);
+                } else if (type == Float.class) {
+                    out.writeByte((byte) 4);
+                    out.writeFloat((Float) field);
+                } else if (type == Double.class) {
+                    out.writeByte((byte) 5);
+                    out.writeDouble((Double) field);
+                } else if (type == Byte.class) {
+                    out.writeByte((byte) 6);
+                    out.writeByte((Byte) field);
+                } else if (type == Short.class) {
+                    out.writeByte((byte) 7);
+                    out.writeShort((Short) field);
+                } else if (type == Boolean.class) {
+                    out.writeByte((byte) 8);
+                    out.writeBoolean((Boolean) field);
+                } else if (type == BytesRef.class) {
+                    out.writeByte((byte) 9);
+                    out.writeBytesRef((BytesRef) field);
+                } else {
+                    throw new IOException("Can't handle sort field value of type [" + type + "]");
+                }
+            }
+        }
+        out.writeVInt(fieldDoc.doc);
+        out.writeFloat(fieldDoc.score);
+    }
+
+    public static void writeScoreDoc(StreamOutput out, ScoreDoc scoreDoc) throws IOException {
+        if (!scoreDoc.getClass().equals(ScoreDoc.class)) {
+            throw new ElasticsearchIllegalArgumentException("This method can only be used to serialize a ScoreDoc, not a " + scoreDoc.getClass());
+        }
+        out.writeVInt(scoreDoc.doc);
+        out.writeFloat(scoreDoc.score);
+    }
+
     // LUCENE 4 UPGRADE: We might want to maintain our own ordinal, instead of Lucene's ordinal
     public static SortField.Type readSortType(StreamInput in) throws IOException {
         return SortField.Type.values()[in.readVInt()];

+ 30 - 3
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/BytesRefOrdValComparator.java

@@ -85,7 +85,8 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
        @lucene.internal */
     long bottomOrd;
 
-    final BytesRef tempBR = new BytesRef();
+    BytesRef top;
+    long topOrd;
 
     public BytesRefOrdValComparator(IndexFieldData.WithOrdinals<?> indexFieldData, int numHits, SortMode sortMode, BytesRef missingValue) {
         this.indexFieldData = indexFieldData;
@@ -140,6 +141,10 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public int compareTopMissing() {
+        throw new UnsupportedOperationException();
+    }
 
     class PerSegmentComparator extends NestedWrappableComparator<BytesRef> {
         final Ordinals.Docs readerOrds;
@@ -205,7 +210,13 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
 
         @Override
         public int compareTop(int doc) throws IOException {
-            throw new UnsupportedOperationException("compareTop() not used for sorting in ES");
+            final long ord = getOrd(doc);
+            if (ord == Ordinals.MISSING_ORDINAL) {
+                return compareTopMissing();
+            } else {
+                final long comparableOrd = ord << 2;
+                return LongValuesComparator.compare(topOrd, comparableOrd);
+            }
         }
 
         @Override
@@ -214,6 +225,16 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
             return LongValuesComparator.compare(bottomOrd, missingOrd);
         }
 
+        @Override
+        public int compareTopMissing() {
+            int cmp =  LongValuesComparator.compare(topOrd, missingOrd);
+            if (cmp == 0) {
+                return compareValues(top, missingValue);
+            } else {
+                return cmp;
+            }
+        }
+
         @Override
         public void copy(int slot, int doc) {
             final long ord = getOrd(doc);
@@ -299,6 +320,12 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
         if (bottomSlot != -1) {
             perSegComp.setBottom(bottomSlot);
         }
+        if (top != null) {
+            perSegComp.setTopValue(top);
+            topOrd = ordInCurrentReader(termsIndex, top);
+        } else {
+            topOrd = missingOrd;
+        }
         return perSegComp;
     }
 
@@ -332,7 +359,7 @@ public final class BytesRefOrdValComparator extends NestedWrappableComparator<By
 
     @Override
     public void setTopValue(BytesRef value) {
-        throw new UnsupportedOperationException("setTopValue() not used for sorting in ES");
+        this.top = value;
     }
 
     @Override

+ 4 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/BytesRefValComparator.java

@@ -126,4 +126,8 @@ public final class BytesRefValComparator extends NestedWrappableComparator<Bytes
         return compareValues(bottom, missingValue);
     }
 
+    @Override
+    public int compareTopMissing() {
+        return compareValues(top, missingValue);
+    }
 }

+ 5 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/DoubleScriptDataComparator.java

@@ -147,4 +147,9 @@ public class DoubleScriptDataComparator extends NumberComparatorBase<Double> {
     public int compareBottomMissing() {
         return Double.compare(bottom, Double.MAX_VALUE);
     }
+
+    @Override
+    public int compareTopMissing() {
+        return Double.compare(top, Double.MAX_VALUE);
+    }
 }

+ 5 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/DoubleValuesComparatorBase.java

@@ -61,6 +61,11 @@ abstract class DoubleValuesComparatorBase<T extends Number> extends NumberCompar
         return compare(bottom, missingValue);
     }
 
+    @Override
+    public int compareTopMissing() {
+        return compare(top.doubleValue(), missingValue);
+    }
+
     static final int compare(double left, double right) {
         return Double.compare(left, right);
     }

+ 5 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/GeoDistanceComparator.java

@@ -122,6 +122,11 @@ public class GeoDistanceComparator extends NumberComparatorBase<Double> {
         return Double.compare(bottom, MISSING_VALUE);
     }
 
+    @Override
+    public int compareTopMissing() {
+        return Double.compare(top, MISSING_VALUE);
+    }
+
     // Computes the distance based on geo points.
     // Due to this abstractions the geo distance comparator doesn't need to deal with whether fields have one
     // or multiple geo points per document.

+ 5 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/LongValuesComparatorBase.java

@@ -71,4 +71,9 @@ abstract class LongValuesComparatorBase<T extends Number> extends NumberComparat
     public int compareBottomMissing() {
         return compare(bottom, missingValue);
     }
+
+    @Override
+    public int compareTopMissing() {
+        return compare(top.longValue(), missingValue);
+    }
 }

+ 7 - 0
src/main/java/org/elasticsearch/index/fielddata/fieldcomparator/NestedWrappableComparator.java

@@ -39,4 +39,11 @@ public abstract class NestedWrappableComparator<T> extends FieldComparator<T> {
      */
     public abstract int compareBottomMissing();
 
+    /**
+     * Compares the missing value to the top.
+     *
+     * @return any N < 0 if the tope is not competitive with the missing value, any N > 0 if the top is competitive
+     * with the top and 0 if they are equal.
+     */
+    public abstract int compareTopMissing();
 }

+ 106 - 8
src/main/java/org/elasticsearch/index/search/nested/NestedFieldComparatorSource.java

@@ -23,7 +23,6 @@ import org.apache.lucene.search.DocIdSet;
 import org.apache.lucene.search.FieldComparator;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.SortField;
-import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.FixedBitSet;
 import org.elasticsearch.ElasticsearchIllegalArgumentException;
 import org.elasticsearch.common.lucene.docset.DocIdSets;
@@ -88,6 +87,7 @@ abstract class NestedFieldComparator extends FieldComparator {
     FixedBitSet rootDocuments;
     FixedBitSet innerDocuments;
     int bottomSlot;
+    Object top;
 
     NestedFieldComparator(FieldComparator wrappedComparator, Filter rootDocumentsFilter, Filter innerDocumentsFilter, int spareSlot) {
         this.wrappedComparator = wrappedComparator;
@@ -137,12 +137,8 @@ abstract class NestedFieldComparator extends FieldComparator {
 
     @Override
     public void setTopValue(Object top) {
-        throw new UnsupportedOperationException("setTopValue() not used for sorting in ES");
-    }
-
-    @Override
-    public int compareTop(int doc) throws IOException {
-        throw new UnsupportedOperationException("compareTop() not used for sorting in ES");
+        this.top = top;
+        wrappedComparator.setTopValue(top);
     }
 
     final static class Lowest extends NestedFieldComparator {
@@ -214,6 +210,42 @@ abstract class NestedFieldComparator extends FieldComparator {
             }
         }
 
+        @Override
+        public int compareTop(int rootDoc) throws IOException {
+            if (rootDoc == 0 || rootDocuments == null || innerDocuments == null) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            // We need to copy the lowest value from all nested docs into slot.
+            int prevRootDoc = rootDocuments.prevSetBit(rootDoc - 1);
+            int nestedDoc = innerDocuments.nextSetBit(prevRootDoc + 1);
+            if (nestedDoc >= rootDoc || nestedDoc == -1) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            // We only need to emit a single cmp value for any matching nested doc
+            @SuppressWarnings("unchecked")
+            int cmp = wrappedComparator.compareTop(nestedDoc);
+            if (cmp > 0) {
+                return cmp;
+            }
+
+            while (true) {
+                nestedDoc = innerDocuments.nextSetBit(nestedDoc + 1);
+                if (nestedDoc >= rootDoc || nestedDoc == -1) {
+                    return cmp;
+                }
+                @SuppressWarnings("unchecked")
+                int cmp1 = wrappedComparator.compareTop(nestedDoc);
+                if (cmp1 > 0) {
+                    return cmp1;
+                } else {
+                    if (cmp1 == 0) {
+                        cmp = 0;
+                    }
+                }
+            }
+        }
     }
 
     final static class Highest extends NestedFieldComparator {
@@ -280,6 +312,38 @@ abstract class NestedFieldComparator extends FieldComparator {
             }
         }
 
+        @Override
+        public int compareTop(int rootDoc) throws IOException {
+            if (rootDoc == 0 || rootDocuments == null || innerDocuments == null) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            int prevRootDoc = rootDocuments.prevSetBit(rootDoc - 1);
+            int nestedDoc = innerDocuments.nextSetBit(prevRootDoc + 1);
+            if (nestedDoc >= rootDoc || nestedDoc == -1) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            @SuppressWarnings("unchecked")
+            int cmp = wrappedComparator.compareTop(nestedDoc);
+            if (cmp < 0) {
+                return cmp;
+            }
+
+            while (true) {
+                nestedDoc = innerDocuments.nextSetBit(nestedDoc + 1);
+                if (nestedDoc >= rootDoc || nestedDoc == -1) {
+                    return cmp;
+                }
+                @SuppressWarnings("unchecked")
+                int cmp1 = wrappedComparator.compareTop(nestedDoc);
+                if (cmp1 < 0) {
+                    return cmp1;
+                } else if (cmp1 == 0) {
+                    cmp = 0;
+                }
+            }
+        }
     }
     
     static abstract class NumericNestedFieldComparatorBase extends NestedFieldComparator {
@@ -337,7 +401,32 @@ abstract class NestedFieldComparator extends FieldComparator {
             }
             afterNested(slot, counter);
         }
-        
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public int compareTop(int rootDoc) throws IOException {
+            if (rootDoc == 0 || rootDocuments == null || innerDocuments == null) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            final int prevRootDoc = rootDocuments.prevSetBit(rootDoc - 1);
+            int nestedDoc = innerDocuments.nextSetBit(prevRootDoc + 1);
+            if (nestedDoc >= rootDoc || nestedDoc == -1) {
+                return compareTopMissing(wrappedComparator);
+            }
+
+            int counter = 1;
+            wrappedComparator.copy(spareSlot, nestedDoc);
+            nestedDoc = innerDocuments.nextSetBit(nestedDoc + 1);
+            while (nestedDoc > prevRootDoc && nestedDoc < rootDoc) {
+                onNested(spareSlot, nestedDoc);
+                nestedDoc = innerDocuments.nextSetBit(nestedDoc + 1);
+                counter++;
+            }
+            afterNested(spareSlot, counter);
+            return wrappedComparator.compareValues(wrappedComparator.value(spareSlot), top);
+        }
+
         protected abstract void onNested(int slot, int nestedDoc);
         
         protected abstract void afterNested(int slot, int count);
@@ -399,4 +488,13 @@ abstract class NestedFieldComparator extends FieldComparator {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    static final int compareTopMissing(FieldComparator<?> comparator) {
+        if (comparator instanceof NestedWrappableComparator) {
+            return ((NestedWrappableComparator) comparator).compareTopMissing();
+        } else {
+            return 0;
+        }
+    }
+
 }

+ 21 - 4
src/main/java/org/elasticsearch/percolator/PercolateContext.java

@@ -22,10 +22,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.search.Filter;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.*;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.percolate.PercolateShardRequest;
 import org.elasticsearch.action.search.SearchType;
@@ -657,6 +654,16 @@ public class PercolateContext extends SearchContext {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void lastEmittedDoc(ScoreDoc doc) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ScoreDoc lastEmittedDoc() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public DfsSearchResult dfsResult() {
         throw new UnsupportedOperationException();
@@ -706,4 +713,14 @@ public class PercolateContext extends SearchContext {
     public MapperService.SmartNameObjectMapper smartNameObjectMapper(String name) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public boolean useSlowScroll() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SearchContext useSlowScroll(boolean useSlowScroll) {
+        throw new UnsupportedOperationException();
+    }
 }

+ 33 - 20
src/main/java/org/elasticsearch/search/SearchService.java

@@ -93,10 +93,10 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
 public class SearchService extends AbstractLifecycleComponent<SearchService> {
 
     public static final String NORMS_LOADING_KEY = "index.norms.loading";
-    private static final String DEFAUTL_KEEPALIVE_COMPONENENT_KEY ="default_keep_alive";
-    public static final String DEFAUTL_KEEPALIVE_KEY ="search."+DEFAUTL_KEEPALIVE_COMPONENENT_KEY;
-    private static final String KEEPALIVE_INTERVAL_COMPONENENT_KEY ="keep_alive_interval";
-    public static final String KEEPALIVE_INTERVAL_KEY ="search."+KEEPALIVE_INTERVAL_COMPONENENT_KEY;
+    private static final String DEFAUTL_KEEPALIVE_COMPONENENT_KEY = "default_keep_alive";
+    public static final String DEFAUTL_KEEPALIVE_KEY = "search." + DEFAUTL_KEEPALIVE_COMPONENENT_KEY;
+    private static final String KEEPALIVE_INTERVAL_COMPONENENT_KEY = "keep_alive_interval";
+    public static final String KEEPALIVE_INTERVAL_KEY = "search." + KEEPALIVE_INTERVAL_COMPONENENT_KEY;
 
 
     private final ThreadPool threadPool;
@@ -446,6 +446,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         SearchContext context = findContext(request.id());
         contextProcessing(context);
         try {
+            if (request.lastEmittedDoc() != null) {
+                context.lastEmittedDoc(request.lastEmittedDoc());
+            }
             context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
             context.indexShard().searchService().onPreFetchPhase(context);
             long time = System.nanoTime();
@@ -502,6 +505,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         SearchContext.setCurrent(context);
         try {
             context.scroll(request.scroll());
+            context.useSlowScroll(request.useSlowScroll());
 
             parseTemplate(request);
             parseSource(context, request.source());
@@ -646,24 +650,33 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
      * handles this as well since the result is always size * shards for Q_A_F
      */
     private void shortcutDocIdsToLoad(SearchContext context) {
-        TopDocs topDocs = context.queryResult().topDocs();
-        if (topDocs.scoreDocs.length < context.from()) {
-            // no more docs...
-            context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
-            return;
-        }
-        int totalSize = context.from() + context.size();
-        int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
-        int counter = 0;
-        for (int i = context.from(); i < totalSize; i++) {
-            if (i < topDocs.scoreDocs.length) {
-                docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
-            } else {
-                break;
+        if (!context.useSlowScroll() && context.request().scroll() != null) {
+            TopDocs topDocs = context.queryResult().topDocs();
+            int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
+            for (int i = 0; i < topDocs.scoreDocs.length; i++) {
+                docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
+            }
+            context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
+        } else {
+            TopDocs topDocs = context.queryResult().topDocs();
+            if (topDocs.scoreDocs.length < context.from()) {
+                // no more docs...
+                context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
+                return;
+            }
+            int totalSize = context.from() + context.size();
+            int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
+            int counter = 0;
+            for (int i = context.from(); i < totalSize; i++) {
+                if (i < topDocs.scoreDocs.length) {
+                    docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
+                } else {
+                    break;
+                }
+                counter++;
             }
-            counter++;
+            context.docIdsToLoad(docIdsToLoad, 0, counter);
         }
-        context.docIdsToLoad(docIdsToLoad, 0, counter);
     }
 
     private void shortcutDocIdsToLoadForScanning(SearchContext context) {

+ 103 - 3
src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java

@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
 import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.cache.recycler.CacheRecycler;
 import org.elasticsearch.common.collect.HppcMaps;
 import org.elasticsearch.common.component.AbstractComponent;
@@ -47,6 +48,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.search.query.QuerySearchResultProvider;
 import org.elasticsearch.search.suggest.Suggest;
 
+import java.io.IOException;
 import java.util.*;
 
 /**
@@ -137,6 +139,102 @@ public class SearchPhaseController extends AbstractComponent {
         return Math.min(left, right) == -1 ? -1 : left + right;
     }
 
+    public ScoreDoc[] sortDocs(SearchRequest request, boolean useClassicSort, AtomicArray<? extends QuerySearchResultProvider> firstResults) throws IOException {
+        if (!useClassicSort && request.scroll() != null) {
+            return sortDocsForScroll(firstResults);
+        } else {
+            return sortDocs(firstResults);
+        }
+    }
+
+    public ScoreDoc[] sortDocsForScroll(AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
+        List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
+        if (results.isEmpty()) {
+            return EMPTY_DOCS;
+        }
+
+        if (optimizeSingleShard) {
+            boolean canOptimize = false;
+            QuerySearchResult result = null;
+            int shardIndex = -1;
+            if (results.size() == 1) {
+                canOptimize = true;
+                result = results.get(0).value.queryResult();
+                shardIndex = results.get(0).index;
+            } else {
+                // lets see if we only got hits from a single shard, if so, we can optimize...
+                for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
+                    if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
+                        if (result != null) { // we already have one, can't really optimize
+                            canOptimize = false;
+                            break;
+                        }
+                        canOptimize = true;
+                        result = entry.value.queryResult();
+                        shardIndex = entry.index;
+                    }
+                }
+            }
+            if (canOptimize) {
+                ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
+                if (scoreDocs.length == 0) {
+                    return EMPTY_DOCS;
+                }
+                int resultDocsSize = scoreDocs.length < result.size() ? scoreDocs.length : result.size();
+                ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
+                for (int i = 0; i < resultDocsSize; i++) {
+                    ScoreDoc scoreDoc = scoreDocs[i];
+                    scoreDoc.shardIndex = shardIndex;
+                    docs[i] = scoreDoc;
+                }
+                return docs;
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
+        Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
+        QuerySearchResultProvider firstResult = sortedResults[0].value;
+
+        final Sort sort;
+        if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
+            TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
+            sort = new Sort(firstTopDocs.fields);
+        } else {
+            sort = null;
+        }
+
+        int topN = firstResult.queryResult().size();
+        TopDocs[] shardTopDocs = new TopDocs[sortedResults.length];
+        if (firstResult.includeFetch()) {
+            // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
+            // this is also important since we shortcut and fetch only docs from "from" and up to "size"
+            topN *= sortedResults.length;
+        }
+        for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
+            TopDocs topDocs = sortedResult.value.queryResult().topDocs();
+            shardTopDocs[sortedResult.index] = topDocs;
+        }
+        TopDocs mergedTopDocs = TopDocs.merge(sort, topN, shardTopDocs);
+        return mergedTopDocs.scoreDocs;
+    }
+
+    public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) {
+        if (request.scroll() != null) {
+            return getLastEmittedDocPerShard(sortedShardList, numShards);
+        } else {
+            return null;
+        }
+    }
+
+    public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) {
+        ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
+        for (ScoreDoc scoreDoc : sortedShardList) {
+            lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc;
+        }
+        return lastEmittedDocPerShard;
+    }
+
     public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> resultsArr) {
         List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
         if (results.isEmpty()) {
@@ -174,10 +272,11 @@ public class SearchPhaseController extends AbstractComponent {
                 if ((scoreDocs.length - result.from()) < resultDocsSize) {
                     resultDocsSize = scoreDocs.length - result.from();
                 }
+                int offset = result.from();
                 if (result.topDocs() instanceof TopFieldDocs) {
                     ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
                     for (int i = 0; i < resultDocsSize; i++) {
-                        ScoreDoc scoreDoc = scoreDocs[result.from() + i];
+                        ScoreDoc scoreDoc = scoreDocs[offset + i];
                         scoreDoc.shardIndex = shardIndex;
                         docs[i] = scoreDoc;
                     }
@@ -185,7 +284,7 @@ public class SearchPhaseController extends AbstractComponent {
                 } else {
                     ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
                     for (int i = 0; i < resultDocsSize; i++) {
-                        ScoreDoc scoreDoc = scoreDocs[result.from() + i];
+                        ScoreDoc scoreDoc = scoreDocs[offset + i];
                         scoreDoc.shardIndex = shardIndex;
                         docs[i] = scoreDoc;
                     }
@@ -289,8 +388,9 @@ public class SearchPhaseController extends AbstractComponent {
         // we only pop the first, this handles "from" nicely since the "from" are down the queue
         // that we already fetched, so we are actually popping the "from" and up to "size"
         ScoreDoc[] shardDocs = new ScoreDoc[resultDocsSize];
-        for (int i = resultDocsSize - 1; i >= 0; i--)      // put docs in array
+        for (int i = resultDocsSize - 1; i >= 0; i--) {      // put docs in array
             shardDocs[i] = (ScoreDoc) queue.pop();
+        }
         return shardDocs;
     }
 

+ 37 - 0
src/main/java/org/elasticsearch/search/fetch/FetchSearchRequest.java

@@ -20,8 +20,13 @@
 package org.elasticsearch.search.fetch;
 
 import com.carrotsearch.hppc.IntArrayList;
+import org.apache.lucene.search.FieldDoc;
+import org.apache.lucene.search.ScoreDoc;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.type.ParsedScrollId;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
@@ -37,14 +42,21 @@ public class FetchSearchRequest extends TransportRequest {
 
     private int size;
 
+    private ScoreDoc lastEmittedDoc;
+
     public FetchSearchRequest() {
     }
 
     public FetchSearchRequest(TransportRequest request, long id, IntArrayList list) {
+        this(request, id, list, null);
+    }
+
+    public FetchSearchRequest(TransportRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
         super(request);
         this.id = id;
         this.docIds = list.buffer;
         this.size = list.size();
+        this.lastEmittedDoc = lastEmittedDoc;
     }
 
     public long id() {
@@ -59,6 +71,10 @@ public class FetchSearchRequest extends TransportRequest {
         return size;
     }
 
+    public ScoreDoc lastEmittedDoc() {
+        return lastEmittedDoc;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
@@ -68,6 +84,16 @@ public class FetchSearchRequest extends TransportRequest {
         for (int i = 0; i < size; i++) {
             docIds[i] = in.readVInt();
         }
+        if (in.getVersion().onOrAfter(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+            byte flag = in.readByte();
+            if (flag == 1) {
+                lastEmittedDoc = Lucene.readFieldDoc(in);
+            } else if (flag == 2) {
+                lastEmittedDoc = Lucene.readScoreDoc(in);
+            } else if (flag != 0) {
+                throw new IOException("Unknown flag: " + flag);
+            }
+        }
     }
 
     @Override
@@ -78,5 +104,16 @@ public class FetchSearchRequest extends TransportRequest {
         for (int i = 0; i < size; i++) {
             out.writeVInt(docIds[i]);
         }
+        if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
+            if (lastEmittedDoc == null) {
+                out.writeByte((byte) 0);
+            } else if (lastEmittedDoc instanceof FieldDoc) {
+                out.writeByte((byte) 1);
+                Lucene.writeFieldDoc(out, (FieldDoc) lastEmittedDoc);
+            } else {
+                out.writeByte((byte) 2);
+                Lucene.writeScoreDoc(out, lastEmittedDoc);
+            }
+        }
     }
 }

+ 24 - 0
src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java

@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.search.SearchType;
@@ -172,10 +173,13 @@ public class DefaultSearchContext extends SearchContext {
 
     private volatile long keepAlive;
 
+    private ScoreDoc lastEmittedDoc;
+
     private volatile long lastAccessTime = -1;
 
     private List<Releasable> clearables = null;
 
+    private volatile boolean useSlowScroll;
 
     public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
                          Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
@@ -644,6 +648,16 @@ public class DefaultSearchContext extends SearchContext {
         this.keepAlive = keepAlive;
     }
 
+    @Override
+    public void lastEmittedDoc(ScoreDoc doc) {
+        this.lastEmittedDoc = doc;
+    }
+
+    @Override
+    public ScoreDoc lastEmittedDoc() {
+        return lastEmittedDoc;
+    }
+
     public SearchLookup lookup() {
         // TODO: The types should take into account the parsing context in QueryParserContext...
         if (searchLookup == null) {
@@ -705,4 +719,14 @@ public class DefaultSearchContext extends SearchContext {
     public MapperService.SmartNameObjectMapper smartNameObjectMapper(String name) {
         return mapperService().smartNameObjectMapper(name, request.types());
     }
+
+    @Override
+    public boolean useSlowScroll() {
+        return useSlowScroll;
+    }
+
+    public DefaultSearchContext useSlowScroll(boolean useSlowScroll) {
+        this.useSlowScroll = useSlowScroll;
+        return this;
+    }
 }

+ 6 - 2
src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java

@@ -310,14 +310,18 @@ public class InternalSearchHit implements SearchHit {
     public void sortValues(Object[] sortValues) {
         // LUCENE 4 UPGRADE: There must be a better way
         // we want to convert to a Text object here, and not BytesRef
+
+        // Don't write into sortValues! Otherwise the fields in FieldDoc is modified, which may be used in other places. (SearchContext#lastEmitedDoc)
+        Object[] sortValuesCopy = new Object[sortValues.length];
+        System.arraycopy(sortValues, 0, sortValuesCopy, 0, sortValues.length);
         if (sortValues != null) {
             for (int i = 0; i < sortValues.length; i++) {
                 if (sortValues[i] instanceof BytesRef) {
-                    sortValues[i] = new StringAndBytesText(new BytesArray((BytesRef) sortValues[i]));
+                    sortValuesCopy[i] = new StringAndBytesText(new BytesArray((BytesRef) sortValues[i]));
                 }
             }
         }
-        this.sortValues = sortValues;
+        this.sortValues = sortValuesCopy;
     }
 
     @Override

+ 8 - 0
src/main/java/org/elasticsearch/search/internal/SearchContext.java

@@ -20,6 +20,7 @@ package org.elasticsearch.search.internal;
 
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.cache.recycler.CacheRecycler;
@@ -275,6 +276,10 @@ public abstract class SearchContext implements Releasable {
 
     public abstract void keepAlive(long keepAlive);
 
+    public abstract void lastEmittedDoc(ScoreDoc doc);
+
+    public abstract ScoreDoc lastEmittedDoc();
+
     public abstract SearchLookup lookup();
 
     public abstract DfsSearchResult dfsResult();
@@ -297,4 +302,7 @@ public abstract class SearchContext implements Releasable {
 
     public abstract MapperService.SmartNameObjectMapper smartNameObjectMapper(String name);
 
+    public abstract boolean useSlowScroll();
+
+    public abstract SearchContext useSlowScroll(boolean useSlowScroll);
 }

+ 25 - 2
src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

@@ -22,6 +22,7 @@ package org.elasticsearch.search.internal;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.search.type.ParsedScrollId;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -76,10 +77,12 @@ public class ShardSearchRequest extends TransportRequest {
 
     private long nowInMillis;
 
+    private boolean useSlowScroll;
+
     public ShardSearchRequest() {
     }
 
-    public ShardSearchRequest(SearchRequest searchRequest, ShardRouting shardRouting, int numberOfShards) {
+    public ShardSearchRequest(SearchRequest searchRequest, ShardRouting shardRouting, int numberOfShards, boolean useSlowScroll) {
         super(searchRequest);
         this.index = shardRouting.index();
         this.shardId = shardRouting.id();
@@ -92,7 +95,7 @@ public class ShardSearchRequest extends TransportRequest {
         this.templateParams = searchRequest.templateParams();
         this.scroll = searchRequest.scroll();
         this.types = searchRequest.types();
-
+        this.useSlowScroll = useSlowScroll;
     }
 
     public ShardSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchType searchType) {
@@ -188,6 +191,17 @@ public class ShardSearchRequest extends TransportRequest {
         return this;
     }
 
+    /**
+     * This setting is internal and will be enabled when at least one node is on versions 1.0.x and 1.1.x to enable
+     * scrolling that those versions support.
+     *
+     * @return Whether the scrolling should use regular search and incrementing the from on each round, which can
+     * bring down nodes due to the big priority queues being generated to accommodate from + size hits for sorting.
+     */
+    public boolean useSlowScroll() {
+        return useSlowScroll;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
@@ -213,6 +227,12 @@ public class ShardSearchRequest extends TransportRequest {
                 templateParams = (Map<String, String>) in.readGenericValue();
             }
         }
+        if (in.getVersion().onOrAfter(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+            useSlowScroll = in.readBoolean();
+        } else {
+            // This means that this request was send from a 1.0.x or 1.1.x node and we need to fallback to slow scroll.
+            useSlowScroll = in.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION);
+        }
     }
 
     @Override
@@ -243,5 +263,8 @@ public class ShardSearchRequest extends TransportRequest {
                 out.writeGenericValue(templateParams);
             }
         }
+        if (out.getVersion().onOrAfter(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
+            out.writeBoolean(useSlowScroll);
+        }
     }
 }

+ 38 - 7
src/main/java/org/elasticsearch/search/query/QueryPhase.java

@@ -21,6 +21,7 @@ package org.elasticsearch.search.query;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.TotalHitCountCollector;
 import org.elasticsearch.action.search.SearchType;
@@ -112,15 +113,45 @@ public class QueryPhase implements SearchPhase {
                 topDocs = new TopDocs(collector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
             } else if (searchContext.searchType() == SearchType.SCAN) {
                 topDocs = searchContext.scanContext().execute(searchContext);
-            } else if (searchContext.sort() != null) {
-                topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort(),
-                        searchContext.trackScores(), searchContext.trackScores());
             } else {
-                rescore = !searchContext.rescore().isEmpty();
-                for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
-                    numDocs = Math.max(rescoreContext.window(), numDocs);
+                // Perhaps have a dedicated scroll phase?
+                if (!searchContext.useSlowScroll() && searchContext.request().scroll() != null) {
+                    numDocs = searchContext.size();
+                    ScoreDoc lastEmittedDoc = searchContext.lastEmittedDoc();
+                    if (searchContext.sort() != null) {
+                        topDocs = searchContext.searcher().searchAfter(
+                                lastEmittedDoc, query, null, numDocs, searchContext.sort(),
+                                searchContext.trackScores(), searchContext.trackScores()
+                        );
+                    } else {
+                        rescore = !searchContext.rescore().isEmpty();
+                        for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
+                            numDocs = Math.max(rescoreContext.window(), numDocs);
+                        }
+                        topDocs = searchContext.searcher().searchAfter(lastEmittedDoc, query, numDocs);
+                    }
+
+                    int size = topDocs.scoreDocs.length;
+                    if (size > 0) {
+                        // In the case of *QUERY_AND_FETCH we don't get back to shards telling them which least
+                        // relevant docs got emitted as hit, we can simply mark the last doc as last emitted
+                        if (searchContext.searchType() == SearchType.QUERY_AND_FETCH ||
+                                searchContext.searchType() == SearchType.DFS_QUERY_AND_FETCH) {
+                            searchContext.lastEmittedDoc(topDocs.scoreDocs[size - 1]);
+                        }
+                    }
+                } else {
+                    if (searchContext.sort() != null) {
+                        topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort(),
+                                searchContext.trackScores(), searchContext.trackScores());
+                    } else {
+                        rescore = !searchContext.rescore().isEmpty();
+                        for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
+                            numDocs = Math.max(rescoreContext.window(), numDocs);
+                        }
+                        topDocs = searchContext.searcher().search(query, numDocs);
+                    }
                 }
-                topDocs = searchContext.searcher().search(query, numDocs);
             }
             searchContext.queryResult().topDocs(topDocs);
         } catch (Throwable e) {

+ 15 - 0
src/main/java/org/elasticsearch/search/sort/SortBuilder.java

@@ -19,13 +19,28 @@
 
 package org.elasticsearch.search.sort;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
 
 /**
  *
  */
 public abstract class SortBuilder implements ToXContent {
 
+    @Override
+    public String toString() {
+        try {
+            XContentBuilder builder = XContentFactory.jsonBuilder();
+            builder.prettyPrint();
+            toXContent(builder, EMPTY_PARAMS);
+            return builder.string();
+        } catch (Exception e) {
+            throw new ElasticsearchException("Failed to build query", e);
+        }
+    }
+
     /**
      * The order of sorting. Defaults to {@link SortOrder#ASC}.
      */

+ 20 - 0
src/test/java/org/elasticsearch/index/search/child/TestSearchContext.java

@@ -20,6 +20,7 @@ package org.elasticsearch.index.search.child;
 
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.search.SearchType;
@@ -526,6 +527,15 @@ public class TestSearchContext extends SearchContext {
     public void keepAlive(long keepAlive) {
     }
 
+    @Override
+    public void lastEmittedDoc(ScoreDoc doc) {
+    }
+
+    @Override
+    public ScoreDoc lastEmittedDoc() {
+        return null;
+    }
+
     @Override
     public SearchLookup lookup() {
         return null;
@@ -583,4 +593,14 @@ public class TestSearchContext extends SearchContext {
     public boolean release() throws ElasticsearchException {
         return false;
     }
+
+    @Override
+    public boolean useSlowScroll() {
+        return false;
+    }
+
+    @Override
+    public SearchContext useSlowScroll(boolean useSlowScroll) {
+        return null;
+    }
 }

+ 18 - 9
src/test/java/org/elasticsearch/nested/SimpleNestedTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -824,11 +825,15 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
                 .endObject()).execute().actionGet();
         refresh();
 
-        SearchResponse searchResponse = client().prepareSearch("test")
-                .setTypes("type1")
+        SearchRequestBuilder searchRequestBuilder = client().prepareSearch("test").setTypes("type1")
                 .setQuery(QueryBuilders.matchAllQuery())
-                .addSort(SortBuilders.fieldSort("nested1.field1").setNestedFilter(termFilter("nested1.field2", true)).missing(10).order(SortOrder.ASC))
-                .execute().actionGet();
+                .addSort(SortBuilders.fieldSort("nested1.field1").setNestedFilter(termFilter("nested1.field2", true)).missing(10).order(SortOrder.ASC));
+
+        if (randomBoolean()) {
+            searchRequestBuilder.setScroll("10m");
+        }
+
+        SearchResponse searchResponse = searchRequestBuilder.get();
 
         assertHitCount(searchResponse, 3);
         assertThat(searchResponse.getHits().hits()[0].id(), equalTo("2"));
@@ -838,11 +843,14 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
         assertThat(searchResponse.getHits().hits()[2].id(), equalTo("3"));
         assertThat(searchResponse.getHits().hits()[2].sortValues()[0].toString(), equalTo("10"));
 
-        searchResponse = client().prepareSearch("test")
-                .setTypes("type1")
-                .setQuery(QueryBuilders.matchAllQuery())
-                .addSort(SortBuilders.fieldSort("nested1.field1").setNestedFilter(termFilter("nested1.field2", true)).missing(10).order(SortOrder.DESC))
-                .execute().actionGet();
+        searchRequestBuilder = client().prepareSearch("test").setTypes("type1") .setQuery(QueryBuilders.matchAllQuery())
+                .addSort(SortBuilders.fieldSort("nested1.field1").setNestedFilter(termFilter("nested1.field2", true)).missing(10).order(SortOrder.DESC));
+
+        if (randomBoolean()) {
+            searchRequestBuilder.setScroll("10m");
+        }
+
+        searchResponse = searchRequestBuilder.get();
 
         assertHitCount(searchResponse, 3);
         assertThat(searchResponse.getHits().hits()[0].id(), equalTo("3"));
@@ -851,6 +859,7 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
         assertThat(searchResponse.getHits().hits()[1].sortValues()[0].toString(), equalTo("5"));
         assertThat(searchResponse.getHits().hits()[2].id(), equalTo("2"));
         assertThat(searchResponse.getHits().hits()[2].sortValues()[0].toString(), equalTo("2"));
+        client().prepareClearScroll().addScrollId("_all").get();
     }
 
     @Test

+ 229 - 0
src/test/java/org/elasticsearch/search/scroll/DuelScrollTests.java

@@ -0,0 +1,229 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.scroll;
+
+import com.carrotsearch.hppc.IntOpenHashSet;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ */
+public class DuelScrollTests extends ElasticsearchIntegrationTest {
+
+    @Test
+    public void testDuel_queryThenFetch() throws Exception {
+        TestContext context = create(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH);
+
+        SearchResponse control = client().prepareSearch("index")
+                .setSearchType(context.searchType)
+                .addSort(context.sort)
+                .setSize(context.numDocs).get();
+        assertNoFailures(control);
+        SearchHits sh = control.getHits();
+        assertThat(sh.totalHits(), equalTo((long) context.numDocs));
+        assertThat(sh.getHits().length, equalTo(context.numDocs));
+
+        SearchResponse searchScrollResponse = client().prepareSearch("index")
+                .setSearchType(context.searchType)
+                .addSort(context.sort)
+                .setSize(context.scrollRequestSize)
+                .setScroll("10m").get();
+
+        assertNoFailures(searchScrollResponse);
+        assertThat(searchScrollResponse.getHits().getTotalHits(), equalTo((long) context.numDocs));
+        assertThat(searchScrollResponse.getHits().hits().length, equalTo(context.scrollRequestSize));
+
+        int counter = 0;
+        for (SearchHit hit : searchScrollResponse.getHits()) {
+            assertThat(hit.sortValues()[0], equalTo(sh.getAt(counter++).sortValues()[0]));
+        }
+
+        int iter = 1;
+        String scrollId = searchScrollResponse.getScrollId();
+        while (true) {
+            searchScrollResponse = client().prepareSearchScroll(scrollId).setScroll("10m").get();
+            assertNoFailures(searchScrollResponse);
+            assertThat(searchScrollResponse.getHits().getTotalHits(), equalTo((long) context.numDocs));
+            if (searchScrollResponse.getHits().hits().length == 0) {
+                break;
+            }
+
+            int expectedLength;
+            int scrollSlice = ++iter * context.scrollRequestSize;
+            if (scrollSlice <= context.numDocs) {
+                expectedLength = context.scrollRequestSize;
+            } else {
+                expectedLength = context.scrollRequestSize - (scrollSlice - context.numDocs);
+            }
+            assertThat(searchScrollResponse.getHits().hits().length, equalTo(expectedLength));
+            for (SearchHit hit : searchScrollResponse.getHits()) {
+                assertThat(hit.sortValues()[0], equalTo(sh.getAt(counter++).sortValues()[0]));
+            }
+            scrollId = searchScrollResponse.getScrollId();
+        }
+
+        assertThat(counter, equalTo(context.numDocs));
+        clearScroll(scrollId);
+    }
+
+    @Test
+    public void testDuel_queryAndFetch() throws Exception {
+        // *_QUERY_AND_FETCH search types are tricky: the ordering can be incorrect, since it returns num_shards * (from + size)
+        // a subsequent scroll call can return hits that should have been in the hits of the first scroll call.
+
+        TestContext context = create(SearchType.DFS_QUERY_AND_FETCH, SearchType.QUERY_AND_FETCH);
+        SearchResponse searchScrollResponse = client().prepareSearch("index")
+                .setSearchType(context.searchType)
+                .addSort(context.sort)
+                .setSize(context.scrollRequestSize)
+                .setScroll("10m").get();
+
+        assertNoFailures(searchScrollResponse);
+        assertThat(searchScrollResponse.getHits().getTotalHits(), equalTo((long) context.numDocs));
+
+        int counter = searchScrollResponse.getHits().hits().length;
+        String scrollId = searchScrollResponse.getScrollId();
+        while (true) {
+            searchScrollResponse = client().prepareSearchScroll(scrollId).setScroll("10m").get();
+            assertNoFailures(searchScrollResponse);
+            assertThat(searchScrollResponse.getHits().getTotalHits(), equalTo((long) context.numDocs));
+            if (searchScrollResponse.getHits().hits().length == 0) {
+                break;
+            }
+
+            counter += searchScrollResponse.getHits().hits().length;
+            scrollId = searchScrollResponse.getScrollId();
+        }
+
+        assertThat(counter, equalTo(context.numDocs));
+        clearScroll(scrollId);
+    }
+
+
+    private TestContext create(SearchType... searchTypes) throws Exception {
+        assertAcked(prepareCreate("index").addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
+                .startObject("field1")
+                    .field("type", "long")
+                .endObject()
+                .startObject("field2")
+                    .field("type", "string")
+                .endObject()
+                .startObject("nested")
+                    .field("type", "nested")
+                    .startObject("properties")
+                        .startObject("field3")
+                            .field("type", "long")
+                        .endObject()
+                        .startObject("field4")
+                            .field("type", "string")
+                        .endObject()
+                    .endObject()
+                .endObject()
+                .endObject().endObject().endObject()));
+
+        int numDocs = 2 + randomInt(512);
+        int scrollRequestSize = randomIntBetween(1, rarely() ? numDocs : numDocs / 2);
+        boolean unevenRouting = randomBoolean();
+
+        int numMissingDocs = atMost(numDocs / 100);
+        IntOpenHashSet missingDocs = new IntOpenHashSet(numMissingDocs);
+        for (int i = 0; i < numMissingDocs; i++) {
+            while (!missingDocs.add(randomInt(numDocs))) {}
+        }
+
+        for (int i = 1; i <= numDocs; i++) {
+            IndexRequestBuilder indexRequestBuilder = client()
+                    .prepareIndex("index", "type", String.valueOf(i));
+            if (missingDocs.contains(i)) {
+                indexRequestBuilder.setSource("x", "y");
+            } else {
+                indexRequestBuilder.setSource(jsonBuilder().startObject()
+                        .field("field1", i)
+                        .field("field2", String.valueOf(i))
+                        .startObject("nested")
+                            .field("field3", i)
+                            .field("field4", String.valueOf(i))
+                        .endObject()
+                        .endObject());
+            }
+
+            if (unevenRouting && randomInt(3) <= 2) {
+                indexRequestBuilder.setRouting("a");
+            }
+            indexRandom(false, indexRequestBuilder);
+        }
+        refresh();
+
+        final SortBuilder sort;
+        if (randomBoolean()) {
+            if (randomBoolean()) {
+                sort = SortBuilders.fieldSort("field1").missing(1);
+            } else {
+                sort = SortBuilders.fieldSort("field2")
+                        .missing("1");
+            }
+        } else {
+            if (randomBoolean()) {
+                sort = SortBuilders.fieldSort("nested.field3").missing(1);
+            } else {
+                sort = SortBuilders.fieldSort("nested.field4").missing("1");
+            }
+        }
+        sort.order(randomBoolean() ? SortOrder.ASC : SortOrder.DESC);
+
+        SearchType searchType = RandomPicks.randomFrom(getRandom(), Arrays.asList(searchTypes));
+
+        logger.info("numDocs={}, scrollRequestSize={}, sort={}, searchType={}", numDocs, scrollRequestSize, sort, searchType);
+        return new TestContext(numDocs, scrollRequestSize, sort, searchType);
+    }
+
+
+    class TestContext {
+
+        final int numDocs;
+        final int scrollRequestSize;
+        final SortBuilder sort;
+        final SearchType searchType;
+
+        TestContext(int numDocs, int scrollRequestSize, SortBuilder sort, SearchType searchType) {
+            this.numDocs = numDocs;
+            this.scrollRequestSize = scrollRequestSize;
+            this.sort = sort;
+            this.searchType = searchType;
+        }
+    }
+
+}

+ 48 - 0
src/test/java/org/elasticsearch/search/scroll/SlowDuelScrollTests.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.scroll;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.type.ParsedScrollId;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+
+/**
+ */
+@LuceneTestCase.Slow
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
+public class SlowDuelScrollTests extends DuelScrollTests {
+
+    private final Version[] versions = new Version[]{
+            Version.CURRENT, ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION, Version.V_1_1_0, Version.V_1_0_0
+    };
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        // If we add a constructor to InternalNode that allows us to define a version, then in the TestCluster
+        // we can start nodes with different versions and then we don't need this setting and would also be helpful
+        // for other tests
+        Settings settings =  super.nodeSettings(nodeOrdinal);
+        Version randomVersion = versions[randomInt(versions.length - 1)];
+        return ImmutableSettings.builder().put(settings).put("tests.mock.version", randomVersion.id).build();
+    }
+}

+ 48 - 0
src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.scroll;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.type.ParsedScrollId;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+
+/**
+ */
+@LuceneTestCase.Slow
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
+public class SlowSearchScrollTests extends SearchScrollTests {
+
+    private final Version[] versions = new Version[]{
+            Version.CURRENT, ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION, Version.V_1_1_0, Version.V_1_0_0
+    };
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        // If we add a constructor to InternalNode that allows us to define a version, then in the TestCluster
+        // we can start nodes with different versions and then we don't need this setting and would also be helpful
+        // for other tests
+        Settings settings =  super.nodeSettings(nodeOrdinal);
+        Version randomVersion = versions[randomInt(versions.length - 1)];
+        return ImmutableSettings.builder().put(settings).put("tests.mock.version", randomVersion.id).build();
+    }
+}