Explorar o código

Add the ability to use the breadth_first mode with nested aggregations (such as `top_hits`) which require access to score information.
The score is recomputed lazily for each document belonging to a top bucket.
Relates to #9825

Jim Ferenczi %!s(int64=9) %!d(string=hai) anos
pai
achega
052191f2a2

+ 0 - 25
core/src/main/java/org/elasticsearch/common/lucene/Lucene.java

@@ -638,31 +638,6 @@ public class Lucene {
         }
     }
 
-    /**
-     * Return a Scorer that throws an ElasticsearchIllegalStateException
-     * on all operations with the given message.
-     */
-    public static Scorer illegalScorer(final String message) {
-        return new Scorer(null) {
-            @Override
-            public float score() throws IOException {
-                throw new IllegalStateException(message);
-            }
-            @Override
-            public int freq() throws IOException {
-                throw new IllegalStateException(message);
-            }
-            @Override
-            public int docID() {
-                throw new IllegalStateException(message);
-            }
-            @Override
-            public DocIdSetIterator iterator() {
-                throw new IllegalStateException(message);
-            }
-        };
-    }
-
     private static final class CommitPoint extends IndexCommit {
         private String segmentsFileName;
         private final Collection<String> files;

+ 1 - 1
core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

@@ -165,7 +165,7 @@ public abstract class AggregatorBase extends Aggregator {
     public DeferringBucketCollector getDeferringCollector() {
         // Default impl is a collector that selects the best buckets
         // but an alternative defer policy may be based on best docs.
-        return new BestBucketsDeferringCollector();
+        return new BestBucketsDeferringCollector(context());
     }
 
     /**

+ 27 - 6
core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

@@ -20,6 +20,9 @@
 package org.elasticsearch.search.aggregations.bucket;
 
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.packed.PackedInts;
 import org.apache.lucene.util.packed.PackedLongValues;
 import org.elasticsearch.common.lucene.Lucene;
@@ -30,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
 import org.elasticsearch.search.aggregations.BucketCollector;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,6 +60,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
 
     final List<Entry> entries = new ArrayList<>();
     BucketCollector collector;
+    final AggregationContext aggContext;
     LeafReaderContext context;
     PackedLongValues.Builder docDeltas;
     PackedLongValues.Builder buckets;
@@ -64,7 +69,8 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
     LongHash selectedBuckets;
 
     /** Sole constructor. */
-    public BestBucketsDeferringCollector() {
+    public BestBucketsDeferringCollector(AggregationContext context) {
+        this.aggContext = context;
     }
 
     @Override
@@ -139,19 +145,34 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
         this.selectedBuckets = hash;
 
         collector.preCollection();
-        if (collector.needsScores()) {
-            throw new IllegalStateException("Cannot defer if scores are needed");
+        boolean needsScores = collector.needsScores();
+        Weight weight = null;
+        if (needsScores) {
+            weight = aggContext.searchContext().searcher()
+                        .createNormalizedWeight(aggContext.searchContext().query(), true);
         }
-
         for (Entry entry : entries) {
             final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
-            leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST
-                    + " collection mode is that scores cannot be buffered along with document IDs"));
+            DocIdSetIterator docIt = null;
+            if (needsScores && entry.docDeltas.size() > 0) {
+                Scorer scorer = weight.scorer(entry.context);
+                // We don't need to check if the scorer is null
+                // since we are sure that there are documents to replay (entry.docDeltas it not empty).
+                docIt = scorer.iterator();
+                leafCollector.setScorer(scorer);
+            }
             final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
             final PackedLongValues.Iterator buckets = entry.buckets.iterator();
             int doc = 0;
             for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
                 doc += docDeltaIterator.next();
+                if (needsScores) {
+                    if (docIt.docID() < doc) {
+                        docIt.advance(doc);
+                    }
+                    // aggregations should only be replayed on matching documents
+                    assert docIt.docID() == doc;
+                }
                 final long bucket = buckets.next();
                 final long rebasedBucket = hash.find(bucket);
                 if (rebasedBucket != -1) {

+ 0 - 1
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java

@@ -199,7 +199,6 @@ public abstract class TermsAggregator extends BucketsAggregator {
     @Override
     protected boolean shouldDefer(Aggregator aggregator) {
         return collectMode == SubAggCollectionMode.BREADTH_FIRST
-                && aggregator.needsScores() == false
                 && !aggsUsedForSorting.contains(aggregator);
     }
 

+ 33 - 2
core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java

@@ -351,8 +351,7 @@ public class TopHitsIT extends ESIntegTestCase {
     }
 
 
-    public void testBreadthFirst() throws Exception {
-        // breadth_first will be ignored since we need scores
+    public void testBreadthFirstWithScoreNeeded() throws Exception {
         SearchResponse response = client().prepareSearch("idx").setTypes("type")
                 .addAggregation(terms("terms")
                         .executionHint(randomExecutionHint())
@@ -382,6 +381,38 @@ public class TopHitsIT extends ESIntegTestCase {
         }
     }
 
+    public void testBreadthFirstWithAggOrderAndScoreNeeded() throws Exception {
+        SearchResponse response = client().prepareSearch("idx").setTypes("type")
+            .addAggregation(terms("terms")
+                .executionHint(randomExecutionHint())
+                .collectMode(SubAggCollectionMode.BREADTH_FIRST)
+                .field(TERMS_AGGS_FIELD)
+                .order(Terms.Order.aggregation("max", false))
+                .subAggregation(max("max").field(SORT_FIELD))
+                .subAggregation(topHits("hits").size(3))
+            ).get();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        assertThat(terms.getBuckets().size(), equalTo(5));
+        int id = 4;
+        for (Terms.Bucket bucket : terms.getBuckets()) {
+            assertThat(bucket, notNullValue());
+            assertThat(key(bucket), equalTo("val" + id));
+            assertThat(bucket.getDocCount(), equalTo(10L));
+            TopHits topHits = bucket.getAggregations().get("hits");
+            SearchHits hits = topHits.getHits();
+            assertThat(hits.totalHits(), equalTo(10L));
+            assertThat(hits.getHits().length, equalTo(3));
+
+            assertThat(hits.getAt(0).sourceAsMap().size(), equalTo(4));
+            id --;
+        }
+    }
+
     public void testBasicsGetProperty() throws Exception {
         SearchResponse searchResponse = client().prepareSearch("idx").setQuery(matchAllQuery())
                 .addAggregation(global("global").subAggregation(topHits("hits"))).execute().actionGet();

+ 1 - 1
core/src/test/java/org/elasticsearch/search/profile/ProfileTests.java

@@ -164,7 +164,7 @@ public class ProfileTests extends ESTestCase {
         final LeafCollector leafCollector = profileCollector.getLeafCollector(reader.leaves().get(0));
         assertThat(profileCollector.getTime(), greaterThan(0L));
         long time = profileCollector.getTime();
-        leafCollector.setScorer(Lucene.illegalScorer("dummy scorer"));
+        leafCollector.setScorer(null);
         assertThat(profileCollector.getTime(), greaterThan(time));
         time = profileCollector.getTime();
         leafCollector.collect(0);

+ 2 - 3
docs/reference/aggregations/bucket/terms-aggregation.asciidoc

@@ -635,9 +635,8 @@ elasticsearch will always use the `depth_first` collect_mode unless explicitly i
 Note that the `order` parameter can still be used to refer to data from a child aggregation when using the `breadth_first` setting - the parent
 aggregation understands that this child aggregation will need to be called first before any of the other child aggregations.
 
-WARNING: It is not possible to nest aggregations such as `top_hits` which require access to match score information under an aggregation that uses
-the `breadth_first` collection mode. This is because this would require a RAM buffer to hold the float score value for every document and
-this would typically be too costly in terms of RAM.
+WARNING: Nested aggregations such as `top_hits` which require access to score information under an aggregation that uses the `breadth_first`
+collection mode need to replay the query on the second pass but only for the documents belonging to the top buckets.
 
 [[search-aggregations-bucket-terms-aggregation-execution-hint]]
 ==== Execution hint