Просмотр исходного кода

Optimize sort on long field (#48804)

* Optimize sort on numeric long and date fields (#39770)

Optimize sort on numeric long and date fields, when 
the system property `es.search.long_sort_optimized` is true.

* Skip optimization if the index has duplicate data (#43121)

Skip sort optimization if the index has 50% or more data
with the same value.
When index has a lot of docs with the same value, sort
optimization doesn't make sense, as DistanceFeatureQuery
will produce same scores for these docs, and Lucene
will use the second sort to tie-break. This could be slower
than usual sorting.

* Sort leaves on search according to the primary numeric sort field (#44021)

This change pre-sort the index reader leaves (segment) prior to search
when the primary sort is a numeric field eligible to the distance feature
optimization. It also adds a tie breaker on `_doc` to the rewritten sort
in order to bypass the fact that leaves will be collected in a random order.
I ran this patch on the http_logs benchmark and the results are very promising:

```
|                                       50th percentile latency | desc_sort_timestamp |    220.706 |      136544 |   136324 |     ms |
|                                       90th percentile latency | desc_sort_timestamp |    244.847 |      162084 |   161839 |     ms |
|                                       99th percentile latency | desc_sort_timestamp |    316.627 |      172005 |   171688 |     ms |
|                                      100th percentile latency | desc_sort_timestamp |    335.306 |      173325 |   172989 |     ms |
|                                  50th percentile service time | desc_sort_timestamp |    218.369 |     1968.11 |  1749.74 |     ms |
|                                  90th percentile service time | desc_sort_timestamp |    244.182 |      2447.2 |  2203.02 |     ms |
|                                  99th percentile service time | desc_sort_timestamp |    313.176 |     2950.85 |  2637.67 |     ms |
|                                 100th percentile service time | desc_sort_timestamp |    332.924 |     2959.38 |  2626.45 |     ms |
|                                                    error rate | desc_sort_timestamp |          0 |           0 |        0 |      % |
|                                                Min Throughput |  asc_sort_timestamp |   0.801824 |    0.800855 | -0.00097 |  ops/s |
|                                             Median Throughput |  asc_sort_timestamp |   0.802595 |    0.801104 | -0.00149 |  ops/s |
|                                                Max Throughput |  asc_sort_timestamp |   0.803282 |    0.801351 | -0.00193 |  ops/s |
|                                       50th percentile latency |  asc_sort_timestamp |    220.761 |     824.098 |  603.336 |     ms |
|                                       90th percentile latency |  asc_sort_timestamp |    251.741 |     853.984 |  602.243 |     ms |
|                                       99th percentile latency |  asc_sort_timestamp |    368.761 |     893.943 |  525.182 |     ms |
|                                      100th percentile latency |  asc_sort_timestamp |    431.042 |      908.85 |  477.808 |     ms |
|                                  50th percentile service time |  asc_sort_timestamp |    218.547 |     820.757 |  602.211 |     ms |
|                                  90th percentile service time |  asc_sort_timestamp |    249.578 |     849.886 |  600.308 |     ms |
|                                  99th percentile service time |  asc_sort_timestamp |    366.317 |     888.894 |  522.577 |     ms |
|                                 100th percentile service time |  asc_sort_timestamp |    430.952 |     908.401 |   477.45 |     ms |
|                                                    error rate |  asc_sort_timestamp |          0 |           0 |        0 |      % |
```

So roughly 10x faster for the descending sort and 2-3x faster in the ascending case. Note
that I indexed the http_logs with a single client in order to simulate real time-based indices
where document are indexed in their timestamp order.

Relates #37043

* Remove nested collector in docs response

As we don't use cancellableCollector anymore, it should be removed from
the expected docs response.

* Use collector manager for search when necessary (#45829)

When we optimize sort, we sort segments by their min/max value.
As a collector expects to have segments in order,
we can not use a single collector for sorted segments.
Thus for such a case, we use collectorManager,
where for every segment a dedicated collector will be created.

* Use shared TopFieldCollector manager

Use shared TopFieldCollector manager for sort optimization.
This collector manager is able to exchange minimum competitive
score between collectors

* Correct calculation of avg value to avoid overflow

* Optimize calculating if index has duplicate data
Mayya Sharipova 5 лет назад
Родитель
Сommit
79d9b365c4

+ 3 - 0
buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy

@@ -720,6 +720,9 @@ class BuildPlugin implements Plugin<Project> {
                 // TODO: remove this once ctx isn't added to update script params in 7.0
                 test.systemProperty 'es.scripting.update.ctx_in_params', 'false'
 
+                // TODO: remove this property in 8.0
+                test.systemProperty 'es.search.rewrite_sort', 'true'
+
                 // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
                 test.systemProperty 'es.transport.cname_in_publish_address', 'true'
 

+ 20 - 41
docs/reference/search/profile.asciidoc

@@ -153,16 +153,9 @@ The API returns the following result:
                  "rewrite_time": 51443,
                  "collector": [
                     {
-                       "name": "CancellableCollector",
-                       "reason": "search_cancelled",
-                       "time_in_nanos": "304311",
-                       "children": [
-                         {
-                           "name": "SimpleTopScoreDocCollector",
-                           "reason": "search_top_hits",
-                           "time_in_nanos": "32273"
-                         }
-                       ]
+                       "name": "SimpleTopScoreDocCollector",
+                       "reason": "search_top_hits",
+                       "time_in_nanos": "32273"
                     }
                  ]
               }
@@ -445,16 +438,9 @@ Looking at the previous example:
 --------------------------------------------------
 "collector": [
    {
-      "name": "CancellableCollector",
-      "reason": "search_cancelled",
-      "time_in_nanos": "304311",
-      "children": [
-        {
-          "name": "SimpleTopScoreDocCollector",
-          "reason": "search_top_hits",
-          "time_in_nanos": "32273"
-        }
-      ]
+      "name": "SimpleTopScoreDocCollector",
+      "reason": "search_top_hits",
+      "time_in_nanos": "32273"
    }
 ]
 --------------------------------------------------
@@ -657,33 +643,26 @@ The API returns the following result:
                      "rewrite_time": 7208,
                      "collector": [
                         {
-                          "name": "CancellableCollector",
-                          "reason": "search_cancelled",
-                          "time_in_nanos": 2390,
+                          "name": "MultiCollector",
+                          "reason": "search_multi",
+                          "time_in_nanos": 1820,
                           "children": [
                             {
-                              "name": "MultiCollector",
-                              "reason": "search_multi",
-                              "time_in_nanos": 1820,
+                              "name": "FilteredCollector",
+                              "reason": "search_post_filter",
+                              "time_in_nanos": 7735,
                               "children": [
                                 {
-                                  "name": "FilteredCollector",
-                                  "reason": "search_post_filter",
-                                  "time_in_nanos": 7735,
-                                  "children": [
-                                    {
-                                      "name": "SimpleTopScoreDocCollector",
-                                      "reason": "search_top_hits",
-                                      "time_in_nanos": 1328
-                                    }
-                                  ]
-                                },
-                                {
-                                  "name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
-                                  "reason": "aggregation",
-                                  "time_in_nanos": 8273
+                                  "name": "SimpleTopScoreDocCollector",
+                                  "reason": "search_top_hits",
+                                  "time_in_nanos": 1328
                                 }
                               ]
+                            },
+                            {
+                              "name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
+                              "reason": "aggregation",
+                              "time_in_nanos": 8273
                             }
                           ]
                         }

+ 86 - 42
server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

@@ -27,6 +27,7 @@ import org.apache.lucene.search.BulkScorer;
 import org.apache.lucene.search.CollectionStatistics;
 import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.CollectorManager;
 import org.apache.lucene.search.ConjunctionDISI;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Explanation;
@@ -35,9 +36,12 @@ import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.TermStatistics;
+import org.apache.lucene.search.TopFieldDocs;
+import org.apache.lucene.search.TotalHits;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.util.BitSet;
@@ -45,14 +49,18 @@ import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.CombinedBitSet;
 import org.apache.lucene.util.SparseFixedBitSet;
+import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
+import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.dfs.AggregatedDfs;
 import org.elasticsearch.search.profile.Timer;
 import org.elasticsearch.search.profile.query.ProfileWeight;
 import org.elasticsearch.search.profile.query.QueryProfileBreakdown;
 import org.elasticsearch.search.profile.query.QueryProfiler;
 import org.elasticsearch.search.profile.query.QueryTimingType;
+import org.elasticsearch.search.query.QuerySearchResult;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -131,12 +139,86 @@ public class ContextIndexSearcher extends IndexSearcher {
         }
     }
 
+    private void checkCancelled() {
+        if (checkCancelled != null) {
+            checkCancelled.run();
+        }
+    }
+
+    public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
+            QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
+        final List<Collector> collectors = new ArrayList<>(leaves.size());
+        for (LeafReaderContext ctx : leaves) {
+            final Collector collector = manager.newCollector();
+            searchLeaf(ctx, weight, collector);
+            collectors.add(collector);
+        }
+        TopFieldDocs mergedTopDocs = (TopFieldDocs) manager.reduce(collectors);
+        // Lucene sets shards indexes during merging of topDocs from different collectors
+        // We need to reset shard index; ES will set shard index later during reduce stage
+        for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) {
+            scoreDoc.shardIndex = -1;
+        }
+        if (totalHits != null) { // we have already precalculated totalHits for the whole index
+            mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields);
+        }
+        result.topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), formats);
+    }
+
     @Override
     protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
-        final Weight cancellableWeight;
-        if (checkCancelled != null) {
-            cancellableWeight = new Weight(weight.getQuery()) {
+        for (LeafReaderContext ctx : leaves) { // search each subreader
+            searchLeaf(ctx, weight, collector);
+        }
+    }
+
+    /**
+     * Lower-level search API.
+     *
+     * {@link LeafCollector#collect(int)} is called for every matching document in
+     * the provided <code>ctx</code>.
+     */
+    private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
+        checkCancelled();
+        weight = wrapWeight(weight);
+        final LeafCollector leafCollector;
+        try {
+            leafCollector = collector.getLeafCollector(ctx);
+        } catch (CollectionTerminatedException e) {
+            // there is no doc of interest in this reader context
+            // continue with the following leaf
+            return;
+        }
+        Bits liveDocs = ctx.reader().getLiveDocs();
+        BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
+        if (liveDocsBitSet == null) {
+            BulkScorer bulkScorer = weight.bulkScorer(ctx);
+            if (bulkScorer != null) {
+                try {
+                    bulkScorer.score(leafCollector, liveDocs);
+                } catch (CollectionTerminatedException e) {
+                    // collection was terminated prematurely
+                    // continue with the following leaf
+                }
+            }
+        } else {
+            // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
+            Scorer scorer = weight.scorer(ctx);
+            if (scorer != null) {
+                try {
+                    intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
+                        checkCancelled == null ? () -> { } : checkCancelled);
+                } catch (CollectionTerminatedException e) {
+                    // collection was terminated prematurely
+                    // continue with the following leaf
+                }
+            }
+        }
+    }
 
+    private Weight wrapWeight(Weight weight) {
+        if (checkCancelled != null) {
+            return new Weight(weight.getQuery()) {
                 @Override
                 public void extractTerms(Set<Term> terms) {
                     throw new UnsupportedOperationException();
@@ -168,48 +250,10 @@ public class ContextIndexSearcher extends IndexSearcher {
                 }
             };
         } else {
-            cancellableWeight = weight;
+            return weight;
         }
-        searchInternal(leaves, cancellableWeight, collector);
     }
 
-    private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
-        for (LeafReaderContext ctx : leaves) { // search each subreader
-            final LeafCollector leafCollector;
-            try {
-                leafCollector = collector.getLeafCollector(ctx);
-            } catch (CollectionTerminatedException e) {
-                // there is no doc of interest in this reader context
-                // continue with the following leaf
-                continue;
-            }
-            Bits liveDocs = ctx.reader().getLiveDocs();
-            BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
-            if (liveDocsBitSet == null) {
-                BulkScorer bulkScorer = weight.bulkScorer(ctx);
-                if (bulkScorer != null) {
-                    try {
-                        bulkScorer.score(leafCollector, liveDocs);
-                    } catch (CollectionTerminatedException e) {
-                        // collection was terminated prematurely
-                        // continue with the following leaf
-                    }
-                }
-            } else {
-                // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
-                Scorer scorer = weight.scorer(ctx);
-                if (scorer != null) {
-                    try {
-                        intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
-                            checkCancelled == null ? () -> {} : checkCancelled);
-                    } catch (CollectionTerminatedException e) {
-                        // collection was terminated prematurely
-                        // continue with the following leaf
-                    }
-                }
-            }
-        }
-    }
 
     private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
         if (liveDocs instanceof SparseFixedBitSet) {

+ 0 - 2
server/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java

@@ -49,8 +49,6 @@ public class CollectorResult implements ToXContentObject, Writeable {
     public static final String REASON_SEARCH_POST_FILTER = "search_post_filter";
     public static final String REASON_SEARCH_MIN_SCORE = "search_min_score";
     public static final String REASON_SEARCH_MULTI = "search_multi";
-    public static final String REASON_SEARCH_TIMEOUT = "search_timeout";
-    public static final String REASON_SEARCH_CANCELLED = "search_cancelled";
     public static final String REASON_AGGREGATION = "aggregation";
     public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global";
 

+ 0 - 53
server/src/main/java/org/elasticsearch/search/query/CancellableCollector.java

@@ -1,53 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.search.query;
-
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.FilterCollector;
-import org.apache.lucene.search.LeafCollector;
-import org.elasticsearch.tasks.TaskCancelledException;
-
-import java.io.IOException;
-import java.util.function.BooleanSupplier;
-
-/**
- * Collector that checks if the task it is executed under is cancelled.
- */
-public class CancellableCollector extends FilterCollector {
-    private final BooleanSupplier cancelled;
-
-    /**
-     * Constructor
-     * @param cancelled supplier of the cancellation flag, the supplier will be called for each segment
-     * @param in wrapped collector
-     */
-    public CancellableCollector(BooleanSupplier cancelled, Collector in) {
-        super(in);
-        this.cancelled = cancelled;
-    }
-
-    @Override
-    public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
-        if (cancelled.getAsBoolean()) {
-            throw new TaskCancelledException("cancelled");
-        }
-        return super.getLeafCollector(context);
-    }
-}

+ 0 - 15
server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java

@@ -28,16 +28,13 @@ import org.apache.lucene.search.Weight;
 import org.elasticsearch.common.lucene.MinimumScoreCollector;
 import org.elasticsearch.common.lucene.search.FilteredCollector;
 import org.elasticsearch.search.profile.query.InternalProfileCollector;
-import org.elasticsearch.tasks.TaskCancelledException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BooleanSupplier;
 
-import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_CANCELLED;
 import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
 import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
 import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
@@ -150,18 +147,6 @@ abstract class QueryCollectorContext {
         };
     }
 
-    /**
-     * Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
-     */
-    static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier cancelled) {
-        return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
-            @Override
-            Collector create(Collector in) throws IOException {
-                return new CancellableCollector(cancelled, in);
-            }
-        };
-    }
-
     /**
      * Creates collector limiting the collection to the first <code>numHits</code> documents
      */

+ 345 - 53
server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

@@ -21,26 +21,40 @@ package org.elasticsearch.search.query;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.index.IndexOptions;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.PointValues;
 import org.apache.lucene.queries.MinDocQuery;
 import org.apache.lucene.queries.SearchAfterSortedDocQuery;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.CollectorManager;
 import org.apache.lucene.search.ConstantScoreQuery;
+import org.apache.lucene.search.DocValuesFieldExistsQuery;
 import org.apache.lucene.search.FieldDoc;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TopFieldCollector;
+import org.apache.lucene.search.TopFieldDocs;
 import org.apache.lucene.search.TotalHits;
+import org.apache.lucene.search.Weight;
 import org.elasticsearch.action.search.SearchTask;
+import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
 import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
+import org.elasticsearch.index.IndexSortConfig;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchPhase;
 import org.elasticsearch.search.SearchService;
@@ -57,16 +71,21 @@ import org.elasticsearch.search.suggest.SuggestPhase;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
 
-import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
 import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
 import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
 import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
 import static org.elasticsearch.search.query.QueryCollectorContext.createMultiCollectorContext;
 import static org.elasticsearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
+import static org.elasticsearch.search.query.TopDocsCollectorContext.shortcutTotalHitCount;
 
 
 /**
@@ -75,6 +94,8 @@ import static org.elasticsearch.search.query.TopDocsCollectorContext.createTopDo
  */
 public class QueryPhase implements SearchPhase {
     private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
+    // TODO: remove this property in 8.0
+    public static final boolean SYS_PROP_REWRITE_SORT = Booleans.parseBoolean(System.getProperty("es.search.rewrite_sort", "true"));
 
     private final AggregationPhase aggregationPhase;
     private final SuggestPhase suggestPhase;
@@ -97,7 +118,7 @@ public class QueryPhase implements SearchPhase {
             suggestPhase.execute(searchContext);
             searchContext.queryResult().topDocs(new TopDocsAndMaxScore(
                     new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
-                    new DocValueFormat[0]);
+                new DocValueFormat[0]);
             return;
         }
 
@@ -109,8 +130,7 @@ public class QueryPhase implements SearchPhase {
         // request, preProcess is called on the DFS phase phase, this is why we pre-process them
         // here to make sure it happens during the QUERY phase
         aggregationPhase.preProcess(searchContext);
-        final ContextIndexSearcher searcher = searchContext.searcher();
-        boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);
+        boolean rescore = executeInternal(searchContext);
 
         if (rescore) { // only if we do a regular search
             rescorePhase.execute(searchContext);
@@ -120,7 +140,7 @@ public class QueryPhase implements SearchPhase {
 
         if (searchContext.getProfilers() != null) {
             ProfileShardResult shardResults = SearchProfileShardResults
-                    .buildShardResults(searchContext.getProfilers());
+                .buildShardResults(searchContext.getProfilers());
             searchContext.queryResult().profileResults(shardResults);
         }
     }
@@ -130,9 +150,9 @@ public class QueryPhase implements SearchPhase {
      * wire everything (mapperService, etc.)
      * @return whether the rescoring phase should be executed
      */
-    static boolean execute(SearchContext searchContext,
-                           final IndexSearcher searcher,
-                           Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {
+    static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException {
+        final ContextIndexSearcher searcher = searchContext.searcher();
+        SortAndFormats sortAndFormatsForRewrittenNumericSort = null;
         final IndexReader reader = searcher.getIndexReader();
         QuerySearchResult queryResult = searchContext.queryResult();
         queryResult.searchTimedOut(false);
@@ -204,6 +224,27 @@ public class QueryPhase implements SearchPhase {
                 hasFilterCollector = true;
             }
 
+            CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter = l -> {};
+            // try to rewrite numeric or date sort to the optimized distanceFeatureQuery
+            if ((searchContext.sort() != null) && SYS_PROP_REWRITE_SORT) {
+                Query rewrittenQuery = tryRewriteLongSort(searchContext, searcher.getIndexReader(), query, hasFilterCollector);
+                if (rewrittenQuery != null) {
+                    query = rewrittenQuery;
+                    // modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
+                    SortField[] oldSortFields = searchContext.sort().sort.getSort();
+                    DocValueFormat[] oldFormats = searchContext.sort().formats;
+                    SortField[] newSortFields = new SortField[oldSortFields.length + 1];
+                    DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 1];
+                    newSortFields[0] = SortField.FIELD_SCORE;
+                    newFormats[0] = DocValueFormat.RAW;
+                    System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
+                    System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
+                    sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
+                    searchContext.sort(new SortAndFormats(new Sort(newSortFields), newFormats));
+                    leafSorter = createLeafSorter(oldSortFields[0]);
+                }
+            }
+
             boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
                 searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;
 
@@ -243,53 +284,22 @@ public class QueryPhase implements SearchPhase {
             } else {
                 checkCancelled = null;
             }
+            searcher.setCheckCancelled(checkCancelled);
 
-            checkCancellationSetter.accept(checkCancelled);
-
-            // add cancellable
-            // this only performs segment-level cancellation, which is cheap and checked regardless of
-            // searchContext.lowLevelCancellation()
-            collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));
-
-            final boolean doProfile = searchContext.getProfilers() != null;
-            // create the top docs collector last when the other collectors are known
-            final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);
-            // add the top docs collector, the first collector context in the chain
-            collectors.addFirst(topDocsFactory);
-
-            final Collector queryCollector;
-            if (doProfile) {
-                InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
-                searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
-                queryCollector = profileCollector;
+            boolean shouldRescore;
+            // if we are optimizing sort and there are no other collectors
+            if (sortAndFormatsForRewrittenNumericSort != null && collectors.size() == 0 && searchContext.getProfilers() == null) {
+                shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
             } else {
-               queryCollector = QueryCollectorContext.createQueryCollector(collectors);
+                shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
             }
 
-            try {
-                searcher.search(query, queryCollector);
-            } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
-                queryResult.terminatedEarly(true);
-            } catch (TimeExceededException e) {
-                assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
-
-                if (searchContext.request().allowPartialSearchResults() == false) {
-                    // Can't rethrow TimeExceededException because not serializable
-                    throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
-                }
-                queryResult.searchTimedOut(true);
-            } finally {
-                searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
-            }
-            if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER
-                    && queryResult.terminatedEarly() == null) {
-                queryResult.terminatedEarly(false);
+            // if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
+            if (sortAndFormatsForRewrittenNumericSort != null) {
+                searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
+                restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
             }
 
-            final QuerySearchResult result = searchContext.queryResult();
-            for (QueryCollectorContext ctx : collectors) {
-                ctx.postProcess(result);
-            }
             ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
             if (executor instanceof QueueResizingEsThreadPoolExecutor) {
                 QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
@@ -298,14 +308,222 @@ public class QueryPhase implements SearchPhase {
             }
             if (searchContext.getProfilers() != null) {
                 ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
-                result.profileResults(shardResults);
+                queryResult.profileResults(shardResults);
             }
-            return topDocsFactory.shouldRescore();
+            return shouldRescore;
         } catch (Exception e) {
             throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
         }
     }
 
+    private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
+            LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
+        // create the top docs collector last when the other collectors are known
+        final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
+        // add the top docs collector, the first collector context in the chain
+        collectors.addFirst(topDocsFactory);
+
+        final Collector queryCollector;
+        if (searchContext.getProfilers() != null) {
+            InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
+            searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
+            queryCollector = profileCollector;
+        } else {
+            queryCollector = QueryCollectorContext.createQueryCollector(collectors);
+        }
+        QuerySearchResult queryResult = searchContext.queryResult();
+        try {
+            searcher.search(query, queryCollector);
+        } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
+            queryResult.terminatedEarly(true);
+        } catch (TimeExceededException e) {
+            assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
+            if (searchContext.request().allowPartialSearchResults() == false) {
+                // Can't rethrow TimeExceededException because not serializable
+                throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
+            }
+            queryResult.searchTimedOut(true);
+        } finally {
+            searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
+        }
+        if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
+            queryResult.terminatedEarly(false);
+        }
+        for (QueryCollectorContext ctx : collectors) {
+            ctx.postProcess(queryResult);
+        }
+        return topDocsFactory.shouldRescore();
+    }
+
+
+    /*
+     * We use collectorManager during sort optimization, where
+     * we have already checked that there are no other collectors, no filters,
+     * no search after, no scroll, no collapse, no track scores.
+     * Absence of all other collectors and parameters allows us to use TopFieldCollector directly.
+     */
+    private static boolean searchWithCollectorManager(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
+            CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter, boolean timeoutSet) throws IOException {
+        final IndexReader reader = searchContext.searcher().getIndexReader();
+        final int numHits = Math.min(searchContext.from() + searchContext.size(),  Math.max(1, reader.numDocs()));
+        final SortAndFormats sortAndFormats = searchContext.sort();
+
+        int totalHitsThreshold;
+        TotalHits totalHits;
+        if (searchContext.trackTotalHitsUpTo() == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
+            totalHitsThreshold = 1;
+            totalHits = new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
+        } else {
+            int hitCount = shortcutTotalHitCount(reader, query);
+            if (hitCount == -1) {
+                totalHitsThreshold = searchContext.trackTotalHitsUpTo();
+                totalHits = null; // will be computed via the collector
+            } else {
+                totalHitsThreshold = 1;
+                totalHits = new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO); // don't compute hit counts via the collector
+            }
+        }
+
+        CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager = TopFieldCollector.createSharedManager(
+            sortAndFormats.sort, numHits, null, totalHitsThreshold);
+
+        List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
+        leafSorter.accept(leaves);
+        try {
+            Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.TOP_SCORES, 1f);
+            searcher.search(leaves, weight, sharedManager, searchContext.queryResult(), sortAndFormats.formats, totalHits);
+        } catch (TimeExceededException e) {
+            assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
+            if (searchContext.request().allowPartialSearchResults() == false) {
+                // Can't rethrow TimeExceededException because not serializable
+                throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
+            }
+            searchContext.queryResult().searchTimedOut(true);
+        } finally {
+            searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
+        }
+        return false; // no rescoring when sorting by field
+    }
+
+    private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader reader,
+                                            Query query, boolean hasFilterCollector) throws IOException {
+        if (searchContext.searchAfter() != null) return null; //TODO: handle sort optimization with search after
+        if (searchContext.scrollContext() != null) return null;
+        if (searchContext.collapse() != null) return null;
+        if (searchContext.trackScores()) return null;
+        if (searchContext.aggregations() != null) return null;
+        Sort sort = searchContext.sort().sort;
+        SortField sortField = sort.getSort()[0];
+        if (SortField.Type.LONG.equals(IndexSortConfig.getSortFieldType(sortField)) == false) return null;
+
+        // check if this is a field of type Long or Date, that is indexed and has doc values
+        String fieldName = sortField.getField();
+        if (fieldName == null) return null; // happens when _score or _doc is the 1st sort field
+        if (searchContext.mapperService() == null) return null; // mapperService can be null in tests
+        final MappedFieldType fieldType = searchContext.mapperService().fullName(fieldName);
+        if (fieldType == null) return null; // for unmapped fields, default behaviour depending on "unmapped_type" flag
+        if ((fieldType.typeName().equals("long") == false) && (fieldType instanceof DateFieldType == false)) return null;
+        if (fieldType.indexOptions() == IndexOptions.NONE) return null; //TODO: change to pointDataDimensionCount() when implemented
+        if (fieldType.hasDocValues() == false) return null;
+
+
+        // check that all sorts are actual document fields or _doc
+        for (int i = 1; i < sort.getSort().length; i++) {
+            SortField sField = sort.getSort()[i];
+            String sFieldName = sField.getField();
+            if (sFieldName == null) {
+                if (SortField.FIELD_DOC.equals(sField) == false) return null;
+            } else {
+                //TODO: find out how to cover _script sort that don't use _score
+                if (searchContext.mapperService().fullName(sFieldName) == null) return null; // could be _script sort that uses _score
+            }
+        }
+
+        // check that setting of missing values allows optimization
+        if (sortField.getMissingValue() == null) return null;
+        Long missingValue = (Long) sortField.getMissingValue();
+        boolean missingValuesAccordingToSort = (sortField.getReverse() && (missingValue == Long.MIN_VALUE)) ||
+            ((sortField.getReverse() == false) && (missingValue == Long.MAX_VALUE));
+        if (missingValuesAccordingToSort == false) return null;
+
+        int docCount = PointValues.getDocCount(reader, fieldName);
+        // is not worth to run optimization on small index
+        if (docCount <= 512) return null;
+
+        // check for multiple values
+        if (PointValues.size(reader, fieldName) != docCount) return null; //TODO: handle multiple values
+
+        // check if the optimization makes sense with the track_total_hits setting
+        if (searchContext.trackTotalHitsUpTo() == Integer.MAX_VALUE) {
+            // with filter, we can't pre-calculate hitsCount, we need to explicitly calculate them => optimization does't make sense
+            if (hasFilterCollector) return null;
+            // if we can't pre-calculate hitsCount based on the query type, optimization does't make sense
+            if (shortcutTotalHitCount(reader, query) == -1) return null;
+        }
+
+        byte[] minValueBytes = PointValues.getMinPackedValue(reader, fieldName);
+        byte[] maxValueBytes = PointValues.getMaxPackedValue(reader, fieldName);
+        if ((maxValueBytes == null) || (minValueBytes == null)) return null;
+        long minValue = LongPoint.decodeDimension(minValueBytes, 0);
+        long maxValue = LongPoint.decodeDimension(maxValueBytes, 0);
+
+        Query rewrittenQuery;
+        if (minValue == maxValue) {
+            rewrittenQuery = new DocValuesFieldExistsQuery(fieldName);
+        } else {
+            if (indexFieldHasDuplicateData(reader, fieldName)) return null;
+            long origin = (sortField.getReverse()) ? maxValue : minValue;
+            long pivotDistance = (maxValue - minValue) >>> 1; // division by 2 on the unsigned representation to avoid overflow
+            if (pivotDistance == 0) { // 0 if maxValue = (minValue + 1)
+                pivotDistance = 1;
+            }
+            rewrittenQuery = LongPoint.newDistanceFeatureQuery(sortField.getField(), 1, origin, pivotDistance);
+        }
+        rewrittenQuery = new BooleanQuery.Builder()
+            .add(query, BooleanClause.Occur.FILTER) // filter for original query
+            .add(rewrittenQuery, BooleanClause.Occur.SHOULD) //should for rewrittenQuery
+            .build();
+        return rewrittenQuery;
+    }
+
+    /**
+     * Creates a sorter of {@link LeafReaderContext} that orders leaves depending on the minimum
+     * value and the sort order of the provided <code>sortField</code>.
+     */
+    static CheckedConsumer<List<LeafReaderContext>, IOException> createLeafSorter(SortField sortField) {
+        return leaves -> {
+            long[] sortValues = new long[leaves.size()];
+            long missingValue = (long) sortField.getMissingValue();
+            for (LeafReaderContext ctx : leaves) {
+                PointValues values = ctx.reader().getPointValues(sortField.getField());
+                if (values == null) {
+                    sortValues[ctx.ord] = missingValue;
+                } else {
+                    byte[] sortValue = sortField.getReverse() ? values.getMaxPackedValue(): values.getMinPackedValue();
+                    sortValues[ctx.ord] = sortValue == null ? missingValue : LongPoint.decodeDimension(sortValue, 0);
+                }
+            }
+            Comparator<LeafReaderContext> comparator = Comparator.comparingLong(l -> sortValues[l.ord]);
+            if (sortField.getReverse()) {
+                comparator = comparator.reversed();
+            }
+            Collections.sort(leaves, comparator);
+        };
+    }
+
+    /**
+     * Restore fieldsDocs to remove the first _score
+     */
+    private static void restoreTopFieldDocs(QuerySearchResult result, SortAndFormats originalSortAndFormats) {
+        TopDocs topDocs = result.topDocs().topDocs;
+        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+            FieldDoc fieldDoc = (FieldDoc) scoreDoc;
+            fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length);
+        }
+        TopFieldDocs newTopDocs = new TopFieldDocs(topDocs.totalHits, topDocs.scoreDocs, originalSortAndFormats.sort.getSort());
+        result.topDocs(new TopDocsAndMaxScore(newTopDocs, Float.NaN), originalSortAndFormats.formats);
+    }
+
     /**
      * Returns true if the provided <code>query</code> returns docs in index order (internal doc ids).
      * @param query The query to execute
@@ -341,5 +559,79 @@ public class QueryPhase implements SearchPhase {
         return true;
     }
 
+    /**
+     * Returns true if more than 50% of data in the index have the same value
+     * The evaluation is approximation based on finding the median value and estimating its count
+     */
+    static boolean indexFieldHasDuplicateData(IndexReader reader, String field) throws IOException {
+        long docsNoDupl = 0; // number of docs in segments with NO duplicate data that would benefit optimization
+        long docsDupl = 0; // number of docs in segments with duplicate data that would NOT benefit optimization
+        for (LeafReaderContext lrc : reader.leaves()) {
+            PointValues pointValues = lrc.reader().getPointValues(field);
+            if (pointValues == null) continue;
+            int docCount = pointValues.getDocCount();
+            if (docCount <= 512) { // skipping small segments as estimateMedianCount doesn't work well on them
+                continue;
+            }
+            assert(pointValues.size() == docCount); // TODO: modify the code to handle multiple values
+
+            int duplDocCount = docCount/2; // expected doc count of duplicate data
+            long minValue = LongPoint.decodeDimension(pointValues.getMinPackedValue(), 0);
+            long maxValue = LongPoint.decodeDimension(pointValues.getMaxPackedValue(), 0);
+            boolean hasDuplicateData = true;
+            while ((minValue < maxValue) && hasDuplicateData) {
+                long midValue = Math.floorDiv(minValue, 2) + Math.floorDiv(maxValue, 2); // to avoid overflow first divide each value by 2
+                long countLeft = estimatePointCount(pointValues, minValue, midValue);
+                long countRight = estimatePointCount(pointValues, midValue + 1, maxValue);
+                if ((countLeft >= countRight) && (countLeft > duplDocCount) ) {
+                    maxValue = midValue;
+                } else if ((countRight > countLeft) && (countRight > duplDocCount)) {
+                    minValue = midValue + 1;
+                } else {
+                    hasDuplicateData = false;
+                }
+            }
+            if (hasDuplicateData) {
+                docsDupl += docCount;
+            } else {
+                docsNoDupl += docCount;
+            }
+        }
+        return (docsDupl > docsNoDupl);
+    }
+
+
+    private static long estimatePointCount(PointValues pointValues, long minValue, long maxValue) {
+        final byte[] minValueAsBytes = new byte[Long.BYTES];
+        LongPoint.encodeDimension(minValue, minValueAsBytes, 0);
+        final byte[] maxValueAsBytes = new byte[Long.BYTES];
+        LongPoint.encodeDimension(maxValue, maxValueAsBytes, 0);
+
+        PointValues.IntersectVisitor visitor = new PointValues.IntersectVisitor() {
+            @Override
+            public void grow(int count) {}
+
+            @Override
+            public void visit(int docID) {}
+
+            @Override
+            public void visit(int docID, byte[] packedValue) {}
+
+            @Override
+            public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
+                if (Arrays.compareUnsigned(minPackedValue, 0, Long.BYTES, maxValueAsBytes, 0, Long.BYTES) > 0 ||
+                    Arrays.compareUnsigned(maxPackedValue, 0, Long.BYTES, minValueAsBytes, 0, Long.BYTES) < 0) {
+                    return PointValues.Relation.CELL_OUTSIDE_QUERY;
+                }
+                if (Arrays.compareUnsigned(minPackedValue, 0, Long.BYTES, minValueAsBytes, 0, Long.BYTES) < 0 ||
+                    Arrays.compareUnsigned(maxPackedValue, 0, Long.BYTES, maxValueAsBytes, 0, Long.BYTES) > 0) {
+                    return PointValues.Relation.CELL_CROSSES_QUERY;
+                }
+                return PointValues.Relation.CELL_INSIDE_QUERY;
+            }
+        };
+        return pointValues.estimatePointCount(visitor);
+    }
+
     private static class TimeExceededException extends RuntimeException {}
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java

@@ -414,8 +414,8 @@ abstract class TopDocsCollectorContext extends QueryCollectorContext {
      * @param hasFilterCollector True if the collector chain contains at least one collector that can filters document.
      */
     static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searchContext,
-                                                                 IndexReader reader,
                                                                  boolean hasFilterCollector) throws IOException {
+        final IndexReader reader = searchContext.searcher().getIndexReader();
         final Query query = searchContext.query();
         // top collectors don't like a size of 0
         final int totalNumDocs = Math.max(1, reader.numDocs());

+ 16 - 7
server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java

@@ -24,12 +24,13 @@ import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.RandomIndexWriter;
-import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TotalHitCountCollector;
 import org.apache.lucene.store.Directory;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.apache.lucene.util.TestUtil;
-import org.elasticsearch.search.query.CancellableCollector;
+import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.AfterClass;
@@ -38,6 +39,8 @@ import org.junit.BeforeClass;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class SearchCancellationTests extends ESTestCase {
 
     static Directory dir;
@@ -75,12 +78,18 @@ public class SearchCancellationTests extends ESTestCase {
     public void testCancellableCollector() throws IOException {
         TotalHitCountCollector collector = new TotalHitCountCollector();
         AtomicBoolean cancelled = new AtomicBoolean();
-        CancellableCollector cancellableCollector = new CancellableCollector(cancelled::get, collector);
-        final LeafCollector leafCollector = cancellableCollector.getLeafCollector(reader.leaves().get(0));
-        leafCollector.collect(0);
+        ContextIndexSearcher searcher = new ContextIndexSearcher(reader,
+            IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy());
+        searcher.setCheckCancelled(() -> {
+            if (cancelled.get()) {
+                throw new TaskCancelledException("cancelled");
+            }
+        });
+        searcher.search(new MatchAllDocsQuery(), collector);
+        assertThat(collector.getTotalHits(), equalTo(reader.numDocs()));
         cancelled.set(true);
-        leafCollector.collect(1);
-        expectThrows(TaskCancelledException.class, () -> cancellableCollector.getLeafCollector(reader.leaves().get(1)));
+        expectThrows(TaskCancelledException.class,
+            () -> searcher.search(new MatchAllDocsQuery(), collector));
     }
 
 }

+ 238 - 83
server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java

@@ -24,21 +24,25 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.LatLonDocValuesField;
 import org.apache.lucene.document.LatLonPoint;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.RandomIndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.queries.MinDocQuery;
+import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.CollectorManager;
 import org.apache.lucene.search.ConstantScoreQuery;
 import org.apache.lucene.search.DocValuesFieldExistsQuery;
 import org.apache.lucene.search.FieldComparator;
@@ -50,9 +54,11 @@ import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.search.TotalHitCountCollector;
 import org.apache.lucene.search.TotalHits;
 import org.apache.lucene.search.Weight;
@@ -65,11 +71,16 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.FixedBitSet;
 import org.elasticsearch.action.search.SearchTask;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.index.query.ParsedQuery;
 import org.elasticsearch.index.search.ESToParentBlockJoinQuery;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.search.internal.ScrollContext;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.SortAndFormats;
@@ -80,10 +91,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.elasticsearch.search.query.QueryPhase.indexFieldHasDuplicateData;
+import static org.elasticsearch.search.query.TopDocsCollectorContext.hasInfMaxScore;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 public class QueryPhaseTests extends IndexShardTestCase {
 
@@ -107,18 +123,17 @@ public class QueryPhaseTests extends IndexShardTestCase {
     }
 
     private void countTestCase(Query query, IndexReader reader, boolean shouldCollectSearch, boolean shouldCollectCount) throws Exception {
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        ContextIndexSearcher searcher = shouldCollectSearch ? newContextSearcher(reader) :
+            newEarlyTerminationContextSearcher(reader, 0);
+        TestSearchContext context = new TestSearchContext(null, indexShard, searcher);
         context.parsedQuery(new ParsedQuery(query));
         context.setSize(0);
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
-
-        final IndexSearcher searcher = shouldCollectSearch ? new IndexSearcher(reader) :
-            getAssertingEarlyTerminationSearcher(reader, 0);
-
-        final boolean rescore = QueryPhase.execute(context, searcher, checkCancelled -> {});
+        final boolean rescore = QueryPhase.executeInternal(context);
         assertFalse(rescore);
-        IndexSearcher countSearcher = shouldCollectCount ? new IndexSearcher(reader) :
-            getAssertingEarlyTerminationSearcher(reader, 0);
+
+        ContextIndexSearcher countSearcher = shouldCollectCount ? newContextSearcher(reader) :
+            newEarlyTerminationContextSearcher(reader, 0);
         assertEquals(countSearcher.count(query), context.queryResult().topDocs().topDocs.totalHits.value);
     }
 
@@ -196,17 +211,17 @@ public class QueryPhaseTests extends IndexShardTestCase {
         w.close();
 
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context =
+            new TestSearchContext(null, indexShard, newEarlyTerminationContextSearcher(reader, 0));
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
 
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(1, context.queryResult().topDocs().topDocs.totalHits.value);
 
-        contextSearcher = new IndexSearcher(reader);
+        context.setSearcher(newContextSearcher(reader));
         context.parsedPostFilter(new ParsedQuery(new MatchNoDocsQuery()));
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(0, context.queryResult().topDocs().topDocs.totalHits.value);
         reader.close();
         dir.close();
@@ -226,15 +241,14 @@ public class QueryPhaseTests extends IndexShardTestCase {
         w.close();
 
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.terminateAfter(1);
         context.setSize(10);
         for (int i = 0; i < 10; i++) {
             context.parsedPostFilter(new ParsedQuery(new TermQuery(new Term("foo", Integer.toString(i)))));
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertEquals(1, context.queryResult().topDocs().topDocs.totalHits.value);
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
         }
@@ -253,27 +267,22 @@ public class QueryPhaseTests extends IndexShardTestCase {
         w.close();
 
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = getAssertingEarlyTerminationSearcher(reader, 0);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context =
+            new TestSearchContext(null, indexShard, newEarlyTerminationContextSearcher(reader, 0));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.setSize(0);
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(1, context.queryResult().topDocs().topDocs.totalHits.value);
 
-        contextSearcher = new IndexSearcher(reader);
         context.minimumScore(100);
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(0, context.queryResult().topDocs().topDocs.totalHits.value);
         reader.close();
         dir.close();
     }
 
     public void testQueryCapturesThreadPoolStats() throws Exception {
-        TestSearchContext context = new TestSearchContext(null, indexShard);
-        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
-        context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
-
         Directory dir = newDirectory();
         IndexWriterConfig iwc = newIndexWriterConfig();
         RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
@@ -283,9 +292,11 @@ public class QueryPhaseTests extends IndexShardTestCase {
         }
         w.close();
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
+        context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
+        context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
 
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         QuerySearchResult results = context.queryResult();
         assertThat(results.serviceTimeEWMA(), greaterThanOrEqualTo(0L));
         assertThat(results.nodeQueueSize(), greaterThanOrEqualTo(0));
@@ -305,8 +316,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         }
         w.close();
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         ScrollContext scrollContext = new ScrollContext();
         scrollContext.lastEmittedDoc = null;
@@ -317,14 +327,14 @@ public class QueryPhaseTests extends IndexShardTestCase {
         int size = randomIntBetween(2, 5);
         context.setSize(size);
 
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
         assertNull(context.queryResult().terminatedEarly());
         assertThat(context.terminateAfter(), equalTo(0));
         assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs));
 
-        contextSearcher = getAssertingEarlyTerminationSearcher(reader, size);
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        context.setSearcher(newEarlyTerminationContextSearcher(reader, size));
+        QueryPhase.executeInternal(context);
         assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
         assertThat(context.terminateAfter(), equalTo(size));
         assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs));
@@ -350,19 +360,17 @@ public class QueryPhaseTests extends IndexShardTestCase {
             w.addDocument(doc);
         }
         w.close();
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        final IndexReader reader = DirectoryReader.open(dir);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
 
-        final IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-
         context.terminateAfter(numDocs);
         {
             context.setSize(10);
             TotalHitCountCollector collector = new TotalHitCountCollector();
             context.queryCollectors().put(TotalHitCountCollector.class, collector);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertFalse(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(10));
@@ -372,13 +380,13 @@ public class QueryPhaseTests extends IndexShardTestCase {
         context.terminateAfter(1);
         {
             context.setSize(1);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
 
             context.setSize(0);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(0));
@@ -386,7 +394,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
 
         {
             context.setSize(1);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
@@ -398,14 +406,14 @@ public class QueryPhaseTests extends IndexShardTestCase {
                 .add(new TermQuery(new Term("foo", "baz")), Occur.SHOULD)
                 .build();
             context.parsedQuery(new ParsedQuery(bq));
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
 
             context.setSize(0);
             context.parsedQuery(new ParsedQuery(bq));
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(0));
@@ -414,7 +422,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
             context.setSize(1);
             TotalHitCountCollector collector = new TotalHitCountCollector();
             context.queryCollectors().put(TotalHitCountCollector.class, collector);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
@@ -425,7 +433,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
             context.setSize(0);
             TotalHitCountCollector collector = new TotalHitCountCollector();
             context.queryCollectors().put(TotalHitCountCollector.class, collector);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertTrue(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(0));
@@ -455,15 +463,15 @@ public class QueryPhaseTests extends IndexShardTestCase {
         }
         w.close();
 
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        final IndexReader reader = DirectoryReader.open(dir);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
         context.setSize(1);
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW}));
 
-        final IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+
+        QueryPhase.executeInternal(context);
         assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
         assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
         assertThat(context.queryResult().topDocs().topDocs.scoreDocs[0], instanceOf(FieldDoc.class));
@@ -472,7 +480,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
 
         {
             context.parsedPostFilter(new ParsedQuery(new MinDocQuery(1)));
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo(numDocs - 1L));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
@@ -482,7 +490,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
 
             final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
             context.queryCollectors().put(TotalHitCountCollector.class, totalHitCountCollector);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
@@ -493,15 +501,15 @@ public class QueryPhaseTests extends IndexShardTestCase {
         }
 
         {
-            contextSearcher = getAssertingEarlyTerminationSearcher(reader, 1);
+            context.setSearcher(newEarlyTerminationContextSearcher(reader, 1));
             context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs[0], instanceOf(FieldDoc.class));
             assertThat(fieldDoc.fields[0], anyOf(equalTo(1), equalTo(2)));
 
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(1));
             assertThat(context.queryResult().topDocs().topDocs.scoreDocs[0], instanceOf(FieldDoc.class));
@@ -537,8 +545,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         // search sort is a prefix of the index sort
         searchSortAndFormats.add(new SortAndFormats(new Sort(indexSort.getSort()[0]), new DocValueFormat[]{DocValueFormat.RAW}));
         for (SortAndFormats searchSortAndFormat : searchSortAndFormats) {
-            IndexSearcher contextSearcher = new IndexSearcher(reader);
-            TestSearchContext context = new TestSearchContext(null, indexShard);
+            TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
             context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
             ScrollContext scrollContext = new ScrollContext();
             scrollContext.lastEmittedDoc = null;
@@ -549,7 +556,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
             context.setSize(10);
             context.sort(searchSortAndFormat);
 
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            QueryPhase.executeInternal(context);
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.terminateAfter(), equalTo(0));
@@ -557,8 +564,8 @@ public class QueryPhaseTests extends IndexShardTestCase {
             int sizeMinus1 = context.queryResult().topDocs().topDocs.scoreDocs.length - 1;
             FieldDoc lastDoc = (FieldDoc) context.queryResult().topDocs().topDocs.scoreDocs[sizeMinus1];
 
-            contextSearcher = getAssertingEarlyTerminationSearcher(reader, 10);
-            QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+            context.setSearcher(newEarlyTerminationContextSearcher(reader, 10));
+            QueryPhase.executeInternal(context);
             assertNull(context.queryResult().terminatedEarly());
             assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
             assertThat(context.terminateAfter(), equalTo(0));
@@ -579,7 +586,6 @@ public class QueryPhaseTests extends IndexShardTestCase {
         dir.close();
     }
 
-
     public void testDisableTopScoreCollection() throws Exception {
         Directory dir = newDirectory();
         IndexWriterConfig iwc = newIndexWriterConfig(new StandardAnalyzer());
@@ -597,8 +603,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         w.close();
 
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
         Query q = new SpanNearQuery.Builder("title", true)
             .addClause(new SpanTermQuery(new Term("title", "foo")))
@@ -608,21 +613,19 @@ public class QueryPhaseTests extends IndexShardTestCase {
         context.parsedQuery(new ParsedQuery(q));
         context.setSize(3);
         context.trackTotalHitsUpTo(3);
-
-        TopDocsCollectorContext topDocsContext =
-            TopDocsCollectorContext.createTopDocsCollectorContext(context, reader, false);
+        TopDocsCollectorContext topDocsContext = TopDocsCollectorContext.createTopDocsCollectorContext(context, false);
         assertEquals(topDocsContext.create(null).scoreMode(), org.apache.lucene.search.ScoreMode.COMPLETE);
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(5, context.queryResult().topDocs().topDocs.totalHits.value);
         assertEquals(context.queryResult().topDocs().topDocs.totalHits.relation, TotalHits.Relation.EQUAL_TO);
         assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(3));
 
+
         context.sort(new SortAndFormats(new Sort(new SortField("other", SortField.Type.INT)),
             new DocValueFormat[] { DocValueFormat.RAW }));
-        topDocsContext =
-            TopDocsCollectorContext.createTopDocsCollectorContext(context, reader, false);
+        topDocsContext = TopDocsCollectorContext.createTopDocsCollectorContext(context, false);
         assertEquals(topDocsContext.create(null).scoreMode(), org.apache.lucene.search.ScoreMode.COMPLETE_NO_SCORES);
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(5, context.queryResult().topDocs().topDocs.totalHits.value);
         assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(3));
         assertEquals(context.queryResult().topDocs().topDocs.totalHits.relation, TotalHits.Relation.EQUAL_TO);
@@ -631,13 +634,108 @@ public class QueryPhaseTests extends IndexShardTestCase {
         dir.close();
     }
 
+    public void testNumericLongOrDateSortOptimization() throws Exception {
+        final String fieldNameLong = "long-field";
+        final String fieldNameDate = "date-field";
+        MappedFieldType fieldTypeLong = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
+        MappedFieldType fieldTypeDate = new DateFieldMapper.Builder(fieldNameDate).fieldType();
+        MapperService mapperService = mock(MapperService.class);
+        when(mapperService.fullName(fieldNameLong)).thenReturn(fieldTypeLong);
+        when(mapperService.fullName(fieldNameDate)).thenReturn(fieldTypeDate);
+
+        final int numDocs = 7000;
+        Directory dir = newDirectory();
+        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(null));
+        for (int i = 1; i <= numDocs; ++i) {
+            Document doc = new Document();
+            long longValue = randomLongBetween(-10000000L, 10000000L);
+            doc.add(new LongPoint(fieldNameLong, longValue));
+            doc.add(new NumericDocValuesField(fieldNameLong, longValue));
+            longValue = randomLongBetween(0, 3000000000000L);
+            doc.add(new LongPoint(fieldNameDate, longValue));
+            doc.add(new NumericDocValuesField(fieldNameDate, longValue));
+            writer.addDocument(doc);
+            if (i % 3500 == 0) writer.commit();
+        }
+        writer.close();
+        final IndexReader reader = DirectoryReader.open(dir);
+
+        TestSearchContext searchContext =
+            spy(new TestSearchContext(null, indexShard, newOptimizedContextSearcher(reader, 0)));
+        when(searchContext.mapperService()).thenReturn(mapperService);
+
+        // 1. Test a sort on long field
+        final SortField sortFieldLong = new SortField(fieldNameLong, SortField.Type.LONG);
+        sortFieldLong.setMissingValue(Long.MAX_VALUE);
+        final Sort longSort = new Sort(sortFieldLong);
+        SortAndFormats sortAndFormats = new SortAndFormats(longSort, new DocValueFormat[]{DocValueFormat.RAW});
+        searchContext.sort(sortAndFormats);
+        searchContext.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
+        searchContext.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
+        searchContext.setSize(10);
+        QueryPhase.executeInternal(searchContext);
+        assertSortResults(searchContext.queryResult().topDocs().topDocs, (long) numDocs, false);
+
+        // 2. Test a sort on long field + date field
+        final SortField sortFieldDate = new SortField(fieldNameDate, SortField.Type.LONG);
+        DocValueFormat dateFormat = fieldTypeDate.docValueFormat(null, null);
+        final Sort longDateSort = new Sort(sortFieldLong, sortFieldDate);
+        sortAndFormats = new SortAndFormats(longDateSort, new DocValueFormat[]{DocValueFormat.RAW, dateFormat});
+        searchContext.sort(sortAndFormats);
+        QueryPhase.executeInternal(searchContext);
+        assertSortResults(searchContext.queryResult().topDocs().topDocs, (long) numDocs, true);
+
+        // 3. Test a sort on date field
+        sortFieldDate.setMissingValue(Long.MAX_VALUE);
+        final Sort dateSort = new Sort(sortFieldDate);
+        sortAndFormats = new SortAndFormats(dateSort, new DocValueFormat[]{dateFormat});
+        searchContext.sort(sortAndFormats);
+        QueryPhase.executeInternal(searchContext);
+        assertSortResults(searchContext.queryResult().topDocs().topDocs, (long) numDocs, false);
+
+        // 4. Test a sort on date field + long field
+        final Sort dateLongSort = new Sort(sortFieldDate, sortFieldLong);
+        sortAndFormats = new SortAndFormats(dateLongSort, new DocValueFormat[]{dateFormat, DocValueFormat.RAW});
+        searchContext.sort(sortAndFormats);
+        QueryPhase.executeInternal(searchContext);
+        assertSortResults(searchContext.queryResult().topDocs().topDocs, (long) numDocs, true);
+        reader.close();
+        dir.close();
+    }
+
+    public void testIndexHasDuplicateData() throws IOException {
+        int docsCount = 7000;
+        int duplIndex = docsCount * 7 / 10;
+        int duplIndex2 = docsCount * 3 / 10;
+        long duplicateValue = randomLongBetween(-10000000L, 10000000L);
+        Directory dir = newDirectory();
+        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(null));
+        for (int docId = 0; docId < docsCount; docId++) {
+            Document doc = new Document();
+            long rndValue = randomLongBetween(-10000000L, 10000000L);
+            long value = (docId < duplIndex) ? duplicateValue : rndValue;
+            long value2 = (docId < duplIndex2) ? duplicateValue : rndValue;
+            doc.add(new LongPoint("duplicateField", value));
+            doc.add(new LongPoint("notDuplicateField", value2));
+            writer.addDocument(doc);
+        }
+        writer.close();
+        final IndexReader reader = DirectoryReader.open(dir);
+        boolean hasDuplicateData = indexFieldHasDuplicateData(reader, "duplicateField");
+        boolean hasDuplicateData2 = indexFieldHasDuplicateData(reader, "notDuplicateField");
+        reader.close();
+        dir.close();
+        assertTrue(hasDuplicateData);
+        assertFalse(hasDuplicateData2);
+    }
+
     public void testMaxScoreQueryVisitor() {
         BitSetProducer producer = context -> new FixedBitSet(1);
         Query query = new ESToParentBlockJoinQuery(new MatchAllDocsQuery(), producer, ScoreMode.Avg, "nested");
-        assertTrue(TopDocsCollectorContext.hasInfMaxScore(query));
+        assertTrue(hasInfMaxScore(query));
 
         query = new ESToParentBlockJoinQuery(new MatchAllDocsQuery(), producer, ScoreMode.None, "nested");
-        assertFalse(TopDocsCollectorContext.hasInfMaxScore(query));
+        assertFalse(hasInfMaxScore(query));
 
 
         for (Occur occur : Occur.values()) {
@@ -645,9 +743,9 @@ public class QueryPhaseTests extends IndexShardTestCase {
                 .add(new ESToParentBlockJoinQuery(new MatchAllDocsQuery(), producer, ScoreMode.Avg, "nested"), occur)
                 .build();
             if (occur == Occur.MUST) {
-                assertTrue(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertTrue(hasInfMaxScore(query));
             } else {
-                assertFalse(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertFalse(hasInfMaxScore(query));
             }
 
             query = new BooleanQuery.Builder()
@@ -656,9 +754,9 @@ public class QueryPhaseTests extends IndexShardTestCase {
                     .build(), occur)
                 .build();
             if (occur == Occur.MUST) {
-                assertTrue(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertTrue(hasInfMaxScore(query));
             } else {
-                assertFalse(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertFalse(hasInfMaxScore(query));
             }
 
             query = new BooleanQuery.Builder()
@@ -666,7 +764,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
                     .add(new ESToParentBlockJoinQuery(new MatchAllDocsQuery(), producer, ScoreMode.Avg, "nested"), occur)
                     .build(), Occur.FILTER)
                 .build();
-            assertFalse(TopDocsCollectorContext.hasInfMaxScore(query));
+            assertFalse(hasInfMaxScore(query));
 
             query = new BooleanQuery.Builder()
                 .add(new BooleanQuery.Builder()
@@ -675,13 +773,33 @@ public class QueryPhaseTests extends IndexShardTestCase {
                     .build(), occur)
                 .build();
             if (occur == Occur.MUST) {
-                assertTrue(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertTrue(hasInfMaxScore(query));
             } else {
-                assertFalse(TopDocsCollectorContext.hasInfMaxScore(query));
+                assertFalse(hasInfMaxScore(query));
             }
         }
     }
 
+    // assert score docs are in order and their number is as expected
+    private void assertSortResults(TopDocs topDocs, long expectedNumDocs, boolean isDoubleSort) {
+        assertEquals(topDocs.totalHits.value, expectedNumDocs);
+        long cur1, cur2;
+        long prev1 = Long.MIN_VALUE;
+        long prev2 = Long.MIN_VALUE;
+        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+            cur1 = (long) ((FieldDoc) scoreDoc).fields[0];
+            assertThat(cur1, greaterThanOrEqualTo(prev1)); // test that docs are properly sorted on the first sort
+            if (isDoubleSort) {
+                cur2 = (long) ((FieldDoc) scoreDoc).fields[1];
+                if (cur1 == prev1) {
+                    assertThat(cur2, greaterThanOrEqualTo(prev2)); // test that docs are properly sorted on the secondary sort
+                }
+                prev2 = cur2;
+            }
+            prev1 = cur1;
+        }
+    }
+
     public void testMinScore() throws Exception {
         Directory dir = newDirectory();
         IndexWriterConfig iwc = newIndexWriterConfig();
@@ -695,8 +813,7 @@ public class QueryPhaseTests extends IndexShardTestCase {
         w.close();
 
         IndexReader reader = DirectoryReader.open(dir);
-        IndexSearcher contextSearcher = new IndexSearcher(reader);
-        TestSearchContext context = new TestSearchContext(null, indexShard);
+        TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader));
         context.parsedQuery(new ParsedQuery(
             new BooleanQuery.Builder()
                 .add(new TermQuery(new Term("foo", "bar")), Occur.MUST)
@@ -708,23 +825,61 @@ public class QueryPhaseTests extends IndexShardTestCase {
         context.setSize(1);
         context.trackTotalHitsUpTo(5);
 
-        QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
+        QueryPhase.executeInternal(context);
         assertEquals(10, context.queryResult().topDocs().topDocs.totalHits.value);
 
         reader.close();
         dir.close();
+
     }
 
-    private static IndexSearcher getAssertingEarlyTerminationSearcher(IndexReader reader, int size) {
-        return new IndexSearcher(reader) {
+    private static ContextIndexSearcher newContextSearcher(IndexReader reader) {
+        return new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
+            IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy());
+    }
+
+    private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexReader reader, int size) {
+        return new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
+            IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {
+
             @Override
-            protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
+            public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
                 final Collector in = new AssertingEarlyTerminationFilterCollector(collector, size);
                 super.search(leaves, weight, in);
             }
         };
     }
 
+    // used to check that numeric long or date sort optimization was run
+    private static ContextIndexSearcher newOptimizedContextSearcher(IndexReader reader, int queryType) {
+        return new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
+            IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {
+
+            @Override
+            public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager,
+                    QuerySearchResult result, DocValueFormat[] formats, TotalHits totalHits) throws IOException {
+                final Query query = weight.getQuery();
+                assertTrue(query instanceof BooleanQuery);
+                List<BooleanClause> clauses = ((BooleanQuery) query).clauses();
+                assertTrue(clauses.size() == 2);
+                assertTrue(clauses.get(0).getOccur() == Occur.FILTER);
+                assertTrue(clauses.get(1).getOccur() == Occur.SHOULD);
+                if (queryType == 0) {
+                    assertTrue (clauses.get(1).getQuery().getClass() ==
+                        LongPoint.newDistanceFeatureQuery("random_field", 1, 1, 1).getClass()
+                    );
+                }
+                if (queryType == 1) assertTrue(clauses.get(1).getQuery() instanceof DocValuesFieldExistsQuery);
+                super.search(leaves, weight, manager, result, formats, totalHits);
+            }
+
+            @Override
+            public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) {
+                assert(false);  // should not be there, expected to search with CollectorManager
+            }
+        };
+    }
+
     private static class AssertingEarlyTerminationFilterCollector extends FilterCollector {
         private final int size;
 

+ 49 - 0
server/src/test/java/org/elasticsearch/search/sort/FieldSortIT.java

@@ -24,6 +24,7 @@ import org.apache.lucene.util.TestUtil;
 import org.apache.lucene.util.UnicodeUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchResponse;
@@ -80,8 +81,10 @@ import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -1841,4 +1844,50 @@ public class FieldSortIT extends ESIntegTestCase {
             }
         }
     }
+
+    public void testLongSortOptimizationCorrectResults() {
+        assertAcked(prepareCreate("test1")
+            .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2))
+            .addMapping("_doc", "long_field", "type=long").get());
+
+        BulkRequestBuilder bulkBuilder = client().prepareBulk();
+        for (int i = 1; i <= 7000; i++) {
+            if (i % 3500 == 0) {
+                bulkBuilder.get();
+                bulkBuilder = client().prepareBulk();
+            }
+            String source = "{\"long_field\":" + randomLong()  + "}";
+            bulkBuilder.add(client().prepareIndex("test1").setId(Integer.toString(i)).setSource(source, XContentType.JSON));
+        }
+        refresh();
+
+        //*** 1. sort DESC on long_field
+        SearchResponse searchResponse = client().prepareSearch()
+            .addSort(new FieldSortBuilder("long_field").order(SortOrder.DESC))
+            .setSize(10).get();
+        assertSearchResponse(searchResponse);
+        long previousLong = Long.MAX_VALUE;
+        for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
+            // check the correct sort order
+            SearchHit hit = searchResponse.getHits().getHits()[i];
+            long currentLong = (long) hit.getSortValues()[0];
+            assertThat("sort order is incorrect", currentLong, lessThanOrEqualTo(previousLong));
+            previousLong = currentLong;
+        }
+
+        //*** 2. sort ASC on long_field
+        searchResponse = client().prepareSearch()
+            .addSort(new FieldSortBuilder("long_field").order(SortOrder.ASC))
+            .setSize(10).get();
+        assertSearchResponse(searchResponse);
+        previousLong = Long.MIN_VALUE;
+        for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
+            // check the correct sort order
+            SearchHit hit = searchResponse.getHits().getHits()[i];
+            long currentLong = (long) hit.getSortValues()[0];
+            assertThat("sort order is incorrect", currentLong, greaterThanOrEqualTo(previousLong));
+            previousLong = currentLong;
+        }
+    }
+
 }

+ 9 - 0
test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java

@@ -106,11 +106,20 @@ public class TestSearchContext extends SearchContext {
     }
 
     public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard) {
+        this(queryShardContext, indexShard, null);
+    }
+
+    public TestSearchContext(QueryShardContext queryShardContext, IndexShard indexShard, ContextIndexSearcher searcher) {
         this.bigArrays = null;
         this.indexService = null;
         this.fixedBitSetFilterCache = null;
         this.indexShard = indexShard;
         this.queryShardContext = queryShardContext;
+        this.searcher = searcher;
+    }
+
+    public void setSearcher(ContextIndexSearcher searcher) {
+        this.searcher = searcher;
     }
 
     @Override