Browse Source

Fix profiled global agg (#71575)

This fixes the `global` aggregator when `profile` is enabled. It does so
by removing all of the special case handling for `global` aggs in
`AggregationPhase` and having the global aggregator itself perform the
scoped collection using the same trick that we use in filter-by-filter
mode of the `filters` aggregation.

Closes #71098
Nik Everett 4 years ago
parent
commit
57e6c78a52

+ 5 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java

@@ -252,6 +252,11 @@ public class AggConstructionContentionBenchmark {
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public Query filterQuery(Query query) {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public IndexSettings getIndexSettings() {
             throw new UnsupportedOperationException();

+ 64 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/380_global.yml

@@ -0,0 +1,64 @@
+setup:
+  - do:
+     bulk:
+        refresh: true
+        index: test
+        body:
+          - '{"index": {}}'
+          - '{"name": "one"}'
+          - '{"index": {}}'
+          - '{"name": "two"}'
+          - '{"index": {}}'
+          - '{"name": "two"}'
+
+---
+simple:
+  - do:
+      search:
+        index: test
+        body:
+          size: 0
+          query:
+            match:
+              name: two
+          aggs:
+            g:
+              global: {}
+              aggs:
+                t:
+                  terms:
+                    field: name.keyword
+
+  - match: { aggregations.g.doc_count: 3 }
+  - length: { aggregations.g.t.buckets: 2 }
+  - match: { aggregations.g.t.buckets.0.key: two }
+  - match: { aggregations.g.t.buckets.1.key: one }
+
+---
+profile:
+  - skip:
+      version: " - 7.99.99"
+      reason:  fixed in 8.0.0 (to be backported to 7.13.0)
+
+  - do:
+      search:
+        index: test
+        body:
+          profile: true
+          size: 0
+          query:
+            match:
+              name: two
+          aggs:
+            g:
+              global: {}
+              aggs:
+                t:
+                  terms:
+                    field: name.keyword
+
+  - match: { aggregations.g.doc_count: 3 }
+  - length: { aggregations.g.t.buckets: 2 }
+  - match: { aggregations.g.t.buckets.0.key: two }
+  - match: { aggregations.g.t.buckets.1.key: one }
+  - match: { profile.shards.0.aggregations.0.description: g }

+ 2 - 1
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -967,7 +967,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                 context.bitsetFilterCache(),
                 context.indexShard().shardId().hashCode(),
                 context::getRelativeTimeInMillis,
-                context::isCancelled
+                context::isCancelled,
+                context::buildFilteredQuery
             );
             context.addReleasable(aggContext);
             try {

+ 13 - 60
server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

@@ -8,18 +8,13 @@
 package org.elasticsearch.search.aggregations;
 
 import org.apache.lucene.search.Collector;
-import org.apache.lucene.search.Query;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.lucene.search.Queries;
-import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.profile.query.CollectorResult;
 import org.elasticsearch.search.profile.query.InternalProfileCollector;
-import org.elasticsearch.search.query.QueryPhaseExecutionException;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -32,31 +27,20 @@ public class AggregationPhase {
     }
 
     public void preProcess(SearchContext context) {
-        if (context.aggregations() != null) {
-            List<Aggregator> collectors = new ArrayList<>();
-            Aggregator[] aggregators;
-            try {
-                aggregators = context.aggregations().factories().createTopLevelAggregators();
-                for (int i = 0; i < aggregators.length; i++) {
-                    if (aggregators[i] instanceof GlobalAggregator == false) {
-                        collectors.add(aggregators[i]);
-                    }
-                }
-                context.aggregations().aggregators(aggregators);
-                if (collectors.isEmpty() == false) {
-                    Collector collector = MultiBucketCollector.wrap(true, collectors);
-                    ((BucketCollector)collector).preCollection();
-                    if (context.getProfilers() != null) {
-                        collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
-                                // TODO: report on child aggs as well
-                                Collections.emptyList());
-                    }
-                    context.queryCollectors().put(AggregationPhase.class, collector);
-                }
-            } catch (IOException e) {
-                throw new AggregationInitializationException("Could not initialize aggregators", e);
-            }
+        if (context.aggregations() == null) {
+            return;
+        }
+        BucketCollector bucketCollector;
+        try {
+            context.aggregations().aggregators(context.aggregations().factories().createTopLevelAggregators());
+            bucketCollector = MultiBucketCollector.wrap(true, List.of(context.aggregations().aggregators()));
+            bucketCollector.preCollection();
+        } catch (IOException e) {
+            throw new AggregationInitializationException("Could not initialize aggregators", e);
         }
+        Collector collector = context.getProfilers() == null ?
+            bucketCollector : new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, List.of());
+        context.queryCollectors().put(AggregationPhase.class, collector);
     }
 
     public void execute(SearchContext context) {
@@ -71,37 +55,6 @@ public class AggregationPhase {
         }
 
         Aggregator[] aggregators = context.aggregations().aggregators();
-        List<Aggregator> globals = new ArrayList<>();
-        for (int i = 0; i < aggregators.length; i++) {
-            if (aggregators[i] instanceof GlobalAggregator) {
-                globals.add(aggregators[i]);
-            }
-        }
-
-        // optimize the global collector based execution
-        if (globals.isEmpty() == false) {
-            BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals);
-            Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
-
-            try {
-                final Collector collector;
-                if (context.getProfilers() == null) {
-                    collector = globalsCollector;
-                } else {
-                    InternalProfileCollector profileCollector = new InternalProfileCollector(
-                            globalsCollector, CollectorResult.REASON_AGGREGATION_GLOBAL,
-                            // TODO: report on sub collectors
-                            Collections.emptyList());
-                    collector = profileCollector;
-                    // start a new profile with this collector
-                    context.getProfilers().addQueryProfiler().setCollector(profileCollector);
-                }
-                globalsCollector.preCollection();
-                context.searcher().search(query, collector);
-            } catch (Exception e) {
-                throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e);
-            }
-        }
 
         List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
         if (context.aggregations().factories().context() != null) {

+ 24 - 8
server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java

@@ -8,11 +8,15 @@
 package org.elasticsearch.search.aggregations.bucket.global;
 
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.BulkScorer;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Scorable;
+import org.apache.lucene.search.Weight;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.CardinalityUpperBound;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
 import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
 import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
@@ -21,22 +25,34 @@ import java.io.IOException;
 import java.util.Map;
 
 public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator {
+    private final Weight weight;
 
     public GlobalAggregator(String name, AggregatorFactories subFactories, AggregationContext context, Map<String, Object> metadata)
         throws IOException {
+
         super(name, subFactories, context, null, CardinalityUpperBound.ONE, metadata);
+        weight = context.filterQuery(new MatchAllDocsQuery()).createWeight(context.searcher(), scoreMode(), 1.0f);
     }
 
     @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
-            final LeafBucketCollector sub) throws IOException {
-        return new LeafBucketCollectorBase(sub, null) {
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+        // Run sub-aggregations on child documents
+        BulkScorer scorer = weight.bulkScorer(ctx);
+        if (scorer == null) {
+            return LeafBucketCollector.NO_OP_COLLECTOR;
+        }
+        scorer.score(new LeafCollector() {
+            @Override
+            public void collect(int doc) throws IOException {
+                collectBucket(sub, doc, 0);
+            }
+
             @Override
-            public void collect(int doc, long bucket) throws IOException {
-                assert bucket == 0 : "global aggregator can only be a top level aggregator";
-                collectBucket(sub, doc, bucket);
+            public void setScorer(Scorable scorer) throws IOException {
+                sub.setScorer(scorer);
             }
-        };
+        }, ctx.reader().getLiveDocs());
+        return LeafBucketCollector.NO_OP_COLLECTOR;
     }
 
     @Override

+ 16 - 1
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java

@@ -159,6 +159,13 @@ public abstract class AggregationContext implements Releasable {
      */
     public abstract Query buildQuery(QueryBuilder builder) throws IOException;
 
+    /**
+     * Add filters from slice or filtered aliases. If you make a new query
+     * and don't combine it with the {@link #query() top level query} then
+     * you must provide it to this method.
+     */
+    public abstract Query filterQuery(Query query);
+
     /**
      * The settings for the index against which this search is running.
      */
@@ -256,6 +263,7 @@ public abstract class AggregationContext implements Releasable {
         private final int randomSeed;
         private final LongSupplier relativeTimeInMillis;
         private final Supplier<Boolean> isCancelled;
+        private final Function<Query, Query> filterQuery;
 
         private final List<Aggregator> releaseMe = new ArrayList<>();
 
@@ -270,7 +278,8 @@ public abstract class AggregationContext implements Releasable {
             BitsetFilterCache bitsetFilterCache,
             int randomSeed,
             LongSupplier relativeTimeInMillis,
-            Supplier<Boolean> isCancelled
+            Supplier<Boolean> isCancelled,
+            Function<Query, Query> filterQuery
         ) {
             this.context = context;
             if (bytesToPreallocate == 0) {
@@ -300,6 +309,7 @@ public abstract class AggregationContext implements Releasable {
             this.randomSeed = randomSeed;
             this.relativeTimeInMillis = relativeTimeInMillis;
             this.isCancelled = isCancelled;
+            this.filterQuery = filterQuery;
         }
 
         @Override
@@ -375,6 +385,11 @@ public abstract class AggregationContext implements Releasable {
             return Rewriteable.rewrite(builder, context, true).toQuery(context);
         }
 
+        @Override
+        public Query filterQuery(Query query) {
+            return filterQuery.apply(query);
+        }
+
         @Override
         public IndexSettings getIndexSettings() {
             return context.getIndexSettings();

+ 7 - 5
server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java

@@ -18,9 +18,11 @@ import org.elasticsearch.index.mapper.CustomTermFreqField;
 import org.elasticsearch.index.mapper.DocCountFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
-import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
-import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
+import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
 
 import java.io.IOException;
 import java.util.List;
@@ -88,10 +90,10 @@ public class DocCountProviderTests extends AggregatorTestCase {
 
     private void testAggregation(Query query,
                                  CheckedConsumer<RandomIndexWriter, IOException> indexer,
-                                 Consumer<InternalGlobal> verify) throws IOException {
-        GlobalAggregationBuilder aggregationBuilder = new GlobalAggregationBuilder("_name");
+                                 Consumer<InternalFilter> verify) throws IOException {
+        AggregationBuilder builder = new FilterAggregationBuilder("f", new MatchAllQueryBuilder());
         MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NUMBER_FIELD, NumberFieldMapper.NumberType.LONG);
         MappedFieldType docCountFieldType = new DocCountFieldMapper.DocCountFieldType();
-        testCase(aggregationBuilder, query, indexer, verify, fieldType, docCountFieldType);
+        testCase(builder, query, indexer, verify, fieldType, docCountFieldType);
     }
 }

+ 25 - 31
server/src/test/java/org/elasticsearch/search/aggregations/bucket/GlobalAggregatorTests.java

@@ -8,33 +8,29 @@
 
 package org.elasticsearch.search.aggregations.bucket;
 
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.SortedNumericDocValuesField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.RandomIndexWriter;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
+import org.apache.lucene.search.Query;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
-import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
 import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
 import org.elasticsearch.search.aggregations.metrics.InternalMin;
 import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.function.BiConsumer;
 
-import static java.util.Collections.singleton;
-
 public class GlobalAggregatorTests extends AggregatorTestCase {
     public void testNoDocs() throws IOException {
         testCase(iw -> {
             // Intentionally not writing any docs
-        }, (global, min) -> {
+        }, new MatchAllDocsQuery(), (global, min) -> {
             assertEquals(0, global.getDocCount());
             assertEquals(Double.POSITIVE_INFINITY, min.getValue(), 0);
         });
@@ -42,38 +38,36 @@ public class GlobalAggregatorTests extends AggregatorTestCase {
 
     public void testSomeDocs() throws IOException {
         testCase(iw -> {
-            iw.addDocument(singleton(new SortedNumericDocValuesField("number", 7)));
-            iw.addDocument(singleton(new SortedNumericDocValuesField("number", 1)));
-        }, (global, min) -> {
+            iw.addDocument(List.of(new SortedNumericDocValuesField("number", 7)));
+            iw.addDocument(List.of(new SortedNumericDocValuesField("number", 1)));
+        }, new MatchAllDocsQuery(), (global, min) -> {
             assertEquals(2, global.getDocCount());
             assertEquals(1, min.getValue(), 0);
         });
     }
 
-    // Note that `global`'s fancy support for ignoring the query comes from special code in AggregationPhase. We don't test that here.
-
-    private void testCase(CheckedConsumer<RandomIndexWriter, IOException> buildIndex, BiConsumer<InternalGlobal, InternalMin> verify)
-            throws IOException {
-        Directory directory = newDirectory();
-        RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
-        buildIndex.accept(indexWriter);
-        indexWriter.close();
-
-        IndexReader indexReader = DirectoryReader.open(directory);
-        IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
+    public void testIgnoresQuery() throws IOException {
+        testCase(iw -> {
+            iw.addDocument(List.of(new SortedNumericDocValuesField("number", 7)));
+            iw.addDocument(List.of(new SortedNumericDocValuesField("number", 1)));
+        }, LongPoint.newRangeQuery("number", 2, Long.MAX_VALUE), (global, min) -> {
+            assertEquals(2, global.getDocCount());
+            assertEquals(1, min.getValue(), 0);
+        });
+    }
 
+    private void testCase(
+        CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
+        Query topLevelQuery,
+        BiConsumer<InternalGlobal, InternalMin> verify
+    ) throws IOException {
         GlobalAggregationBuilder aggregationBuilder = new GlobalAggregationBuilder("_name");
         aggregationBuilder.subAggregation(new MinAggregationBuilder("in_global").field("number"));
         MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG);
 
-        GlobalAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
-        aggregator.preCollection();
-        indexSearcher.search(new MatchAllDocsQuery(), aggregator);
-        aggregator.postCollection();
-        InternalGlobal result = (InternalGlobal) aggregator.buildTopLevel();
-        verify.accept(result, (InternalMin) result.getAggregations().asMap().get("in_global"));
-
-        indexReader.close();
-        directory.close();
+        testCase(aggregationBuilder, topLevelQuery, buildIndex, (InternalGlobal result) -> {
+            InternalMin min = result.getAggregations().get("in_global");
+            verify.accept(result, min);
+        }, fieldType);
     }
 }

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java

@@ -396,6 +396,11 @@ public abstract class MapperServiceTestCase extends ESTestCase {
                 throw new UnsupportedOperationException();
             }
 
+            @Override
+            public Query filterQuery(Query query) {
+                throw new UnsupportedOperationException();
+            }
+
             @Override
             protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
                 return ft.fielddataBuilder("test", null).build(new IndexFieldDataCache.None(), new NoneCircuitBreakerService());

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -292,7 +292,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
             bitsetFilterCache,
             randomInt(),
             () -> 0L,
-            () -> false
+            () -> false,
+            q -> q
         );
         releasables.add(context);
         return context;