浏览代码

Change filter aggregation to execute like filters aggregation (#90318)

This change attempts to do this by changing the
`FilterAggregationBuilder` to use the `FiltersAggregatorFactory` instead
of the `FilterAggregatorFactory`. The wrapped by another
`AggregatorFactory` instance that uses an `AdaptingAggregator`
aggregator translate the results back to `InternalFilter`, so that end
result stays the same, but can `filter` aggregation can make use of the
optimizations that `filters` provides.

The change also removes the: `FilterAggregatorFactory`,
`FilterAggregator` and `FilterAggregatorTests` classes.

Closes #90076
Martijn van Groningen 3 年之前
父节点
当前提交
ad00fafc3d

+ 1 - 2
server/src/main/java/org/elasticsearch/search/aggregations/CardinalityUpperBound.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.search.aggregations;
 
 import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
-import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregator;
 import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator;
 
 import java.util.function.IntFunction;
@@ -40,7 +39,7 @@ public abstract class CardinalityUpperBound {
      * {@link Aggregator}s with this cardinality will collect be collected
      * once or zero times. This will only be true for top level {@linkplain Aggregator}s
      * and for sub-aggregator's who's ancestors are all single-bucket
-     * aggregations like {@link FilterAggregator} or a {@link RangeAggregator}
+     * aggregations like a {@link RangeAggregator}
      * configured to collect only a single range.
      */
     public static final CardinalityUpperBound ONE = new KnownCardinalityUpperBound(1);

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java

@@ -16,7 +16,7 @@ import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
-import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorFactory;
 import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregatorFactory;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
@@ -175,7 +175,7 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
         if (factory == null) {
             return null;
         } else if (factory instanceof NestedAggregatorFactory
-            || factory instanceof FilterAggregatorFactory
+            || factory instanceof FilterAggregationBuilder.FilterAggregatorFactory
             || factory instanceof RandomSamplerAggregatorFactory) {
                 return validateParentAggregations(factory.getParent());
             } else {

+ 83 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregationBuilder.java

@@ -15,14 +15,23 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.Rewriteable;
 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
+import org.elasticsearch.search.aggregations.AdaptingAggregator;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.AggregationPath;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -101,7 +110,7 @@ public class FilterAggregationBuilder extends AbstractAggregationBuilder<FilterA
         AggregatorFactory parent,
         AggregatorFactories.Builder subFactoriesBuilder
     ) throws IOException {
-        return new FilterAggregatorFactory(name, filter, context, parent, subFactoriesBuilder, metadata);
+        return new FilterAggregatorFactory(filter, name, context, parent, subFactoriesBuilder, metadata);
     }
 
     @Override
@@ -144,4 +153,77 @@ public class FilterAggregationBuilder extends AbstractAggregationBuilder<FilterA
     public Version getMinimalSupportedVersion() {
         return Version.V_EMPTY;
     }
+
+    public static class FilterAggregatorFactory extends AggregatorFactory {
+
+        private final QueryToFilterAdapter filter;
+
+        public FilterAggregatorFactory(
+            QueryBuilder filter,
+            String name,
+            AggregationContext context,
+            AggregatorFactory parent,
+            AggregatorFactories.Builder subFactoriesBuilder,
+            Map<String, Object> metadata
+        ) throws IOException {
+            super(name, context, parent, subFactoriesBuilder, metadata);
+            this.filter = QueryToFilterAdapter.build(context.searcher(), "1", context.buildQuery(filter));
+            ;
+        }
+
+        @Override
+        protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
+            throws IOException {
+            final var innerAggregator = FiltersAggregator.build(
+                name,
+                factories,
+                List.of(filter),
+                false,
+                null,
+                context,
+                parent,
+                cardinality,
+                metadata
+            );
+            return new FilterAggregator(name, parent, factories, innerAggregator);
+        }
+    }
+
+    static class FilterAggregator extends AdaptingAggregator implements SingleBucketAggregator {
+
+        private final String name;
+        private final FiltersAggregator innerAggregator;
+
+        FilterAggregator(String name, Aggregator parent, AggregatorFactories subAggregators, FiltersAggregator innerAggregator)
+            throws IOException {
+            super(parent, subAggregators, aggregatorFactories -> innerAggregator);
+            this.name = name;
+            this.innerAggregator = innerAggregator;
+        }
+
+        @Override
+        protected InternalAggregation adapt(InternalAggregation delegateResult) throws IOException {
+            InternalFilters innerResult = (InternalFilters) delegateResult;
+            var innerBucket = innerResult.getBuckets().get(0);
+            return new InternalFilter(name, innerBucket.getDocCount(), innerBucket.getAggregations(), innerResult.getMetadata());
+        }
+
+        @Override
+        public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
+            return resolveSortPathOnValidAgg(next, path);
+        }
+
+        @Override
+        public BucketComparator bucketComparator(String key, SortOrder order) {
+            if (key == null || "doc_count".equals(key)) {
+                return (lhs, rhs) -> order.reverseMul() * Long.compare(
+                    innerAggregator.bucketDocCount(lhs),
+                    innerAggregator.bucketDocCount(rhs)
+                );
+            } else {
+                return super.bucketComparator(key, order);
+            }
+        }
+
+    }
 }

+ 0 - 82
server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java

@@ -1,82 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-package org.elasticsearch.search.aggregations.bucket.filter;
-
-import org.apache.lucene.search.Weight;
-import org.apache.lucene.util.Bits;
-import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.search.aggregations.AggregationExecutionContext;
-import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.AggregatorFactories;
-import org.elasticsearch.search.aggregations.CardinalityUpperBound;
-import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
-import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
-import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
-import org.elasticsearch.search.aggregations.support.AggregationContext;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.function.Supplier;
-
-/**
- * Aggregate all docs that match a filter.
- */
-public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator {
-
-    private final Supplier<Weight> filter;
-
-    public FilterAggregator(
-        String name,
-        Supplier<Weight> filter,
-        AggregatorFactories factories,
-        AggregationContext context,
-        Aggregator parent,
-        CardinalityUpperBound cardinality,
-        Map<String, Object> metadata
-    ) throws IOException {
-        super(name, factories, context, parent, cardinality, metadata);
-        this.filter = filter;
-    }
-
-    @Override
-    public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
-        // no need to provide deleted docs to the filter
-        final Bits bits = Lucene.asSequentialAccessBits(
-            aggCtx.getLeafReaderContext().reader().maxDoc(),
-            filter.get().scorerSupplier(aggCtx.getLeafReaderContext())
-        );
-        return new LeafBucketCollectorBase(sub, null) {
-            @Override
-            public void collect(int doc, long bucket) throws IOException {
-                if (bits.get(doc)) {
-                    collectBucket(sub, doc, bucket);
-                }
-            }
-        };
-    }
-
-    @Override
-    public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
-        return buildAggregationsForSingleBucket(
-            owningBucketOrds,
-            (owningBucketOrd, subAggregationResults) -> new InternalFilter(
-                name,
-                bucketDocCount(owningBucketOrd),
-                subAggregationResults,
-                metadata()
-            )
-        );
-    }
-
-    @Override
-    public InternalAggregation buildEmptyAggregation() {
-        return new InternalFilter(name, 0, buildEmptySubAggregations(), metadata());
-    }
-}

+ 0 - 70
server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java

@@ -1,70 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.search.aggregations.bucket.filter;
-
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Weight;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.aggregations.AggregationInitializationException;
-import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.AggregatorFactories;
-import org.elasticsearch.search.aggregations.AggregatorFactory;
-import org.elasticsearch.search.aggregations.CardinalityUpperBound;
-import org.elasticsearch.search.aggregations.support.AggregationContext;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class FilterAggregatorFactory extends AggregatorFactory {
-
-    private final Query filter;
-    private Weight weight;
-
-    public FilterAggregatorFactory(
-        String name,
-        QueryBuilder filterBuilder,
-        AggregationContext context,
-        AggregatorFactory parent,
-        AggregatorFactories.Builder subFactoriesBuilder,
-        Map<String, Object> metadata
-    ) throws IOException {
-        super(name, context, parent, subFactoriesBuilder, metadata);
-        filter = context.buildQuery(filterBuilder);
-    }
-
-    /**
-     * Returns the {@link Weight} for this filter aggregation, creating it if
-     * necessary. This is done lazily so that the {@link Weight} is only created
-     * if the aggregation collects documents reducing the overhead of the
-     * aggregation in the case where no documents are collected.
-     *
-     * Note that as aggregations are initialsed and executed in a serial manner,
-     * no concurrency considerations are necessary here.
-     */
-    public Weight getWeight() {
-        if (weight == null) {
-            IndexSearcher contextSearcher = context.searcher();
-            try {
-                weight = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
-            } catch (IOException e) {
-                throw new AggregationInitializationException("Failed to initialse filter", e);
-            }
-        }
-        return weight;
-    }
-
-    @Override
-    public Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
-        throws IOException {
-        return new FilterAggregator(name, () -> this.getWeight(), factories, context, parent, cardinality, metadata);
-    }
-
-}

+ 0 - 138
server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java

@@ -1,138 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-package org.elasticsearch.search.aggregations.bucket.filter;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.tests.index.RandomIndexWriter;
-import org.elasticsearch.index.mapper.KeywordFieldMapper;
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.aggregations.AggregationExecutionContext;
-import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
-import org.elasticsearch.search.aggregations.AggregatorTestCase;
-import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
-import org.elasticsearch.search.sort.SortOrder;
-import org.junit.Before;
-
-import java.io.IOException;
-
-import static java.util.Collections.singleton;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-
-public class FilterAggregatorTests extends AggregatorTestCase {
-    private MappedFieldType fieldType;
-
-    @Before
-    public void setUpTest() throws Exception {
-        super.setUp();
-        fieldType = new KeywordFieldMapper.KeywordFieldType("field");
-    }
-
-    public void testEmpty() throws Exception {
-        Directory directory = newDirectory();
-        RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
-        indexWriter.close();
-        IndexReader indexReader = DirectoryReader.open(directory);
-        IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
-        QueryBuilder filter = QueryBuilders.termQuery("field", randomAlphaOfLength(5));
-        FilterAggregationBuilder builder = new FilterAggregationBuilder("test", filter);
-        InternalFilter response = searchAndReduce(new AggTestConfig(indexSearcher, builder, fieldType));
-        assertEquals(response.getDocCount(), 0);
-        assertFalse(AggregationInspectionHelper.hasValue(response));
-        indexReader.close();
-        directory.close();
-    }
-
-    public void testRandom() throws Exception {
-        Directory directory = newDirectory();
-        RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
-        int numDocs = randomIntBetween(100, 200);
-        int maxTerm = randomIntBetween(10, 50);
-        int[] expectedBucketCount = new int[maxTerm];
-        Document document = new Document();
-        for (int i = 0; i < numDocs; i++) {
-            if (frequently()) {
-                // make sure we have more than one segment to test the merge
-                indexWriter.commit();
-            }
-            int value = randomInt(maxTerm - 1);
-            expectedBucketCount[value] += 1;
-            document.add(new Field("field", Integer.toString(value), KeywordFieldMapper.Defaults.FIELD_TYPE));
-            indexWriter.addDocument(document);
-            document.clear();
-        }
-        indexWriter.close();
-
-        IndexReader indexReader = DirectoryReader.open(directory);
-        IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
-        try {
-
-            int value = randomInt(maxTerm - 1);
-            QueryBuilder filter = QueryBuilders.termQuery("field", Integer.toString(value));
-            FilterAggregationBuilder builder = new FilterAggregationBuilder("test", filter);
-
-            final InternalFilter response = searchAndReduce(new AggTestConfig(indexSearcher, builder, fieldType));
-            assertEquals(response.getDocCount(), (long) expectedBucketCount[value]);
-            if (expectedBucketCount[value] > 0) {
-                assertTrue(AggregationInspectionHelper.hasValue(response));
-            } else {
-                assertFalse(AggregationInspectionHelper.hasValue(response));
-            }
-        } finally {
-            indexReader.close();
-            directory.close();
-        }
-    }
-
-    public void testBucketComparator() throws IOException {
-        try (Directory directory = newDirectory()) {
-            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
-                indexWriter.addDocument(singleton(new Field("field", "1", KeywordFieldMapper.Defaults.FIELD_TYPE)));
-            }
-            try (IndexReader indexReader = DirectoryReader.open(directory)) {
-                IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
-                FilterAggregationBuilder builder = new FilterAggregationBuilder("test", new MatchAllQueryBuilder());
-                FilterAggregator agg = createAggregator(builder, indexSearcher, fieldType);
-                agg.preCollection();
-                LeafBucketCollector collector = agg.getLeafCollector(
-                    new AggregationExecutionContext(indexReader.leaves().get(0), null, null, null)
-                );
-                collector.collect(0, 0);
-                collector.collect(0, 0);
-                collector.collect(0, 1);
-                BucketComparator c = agg.bucketComparator(null, SortOrder.ASC);
-                assertThat(c.compare(0, 1), greaterThan(0));
-                assertThat(c.compare(1, 0), lessThan(0));
-                c = agg.bucketComparator("doc_count", SortOrder.ASC);
-                assertThat(c.compare(0, 1), greaterThan(0));
-                assertThat(c.compare(1, 0), lessThan(0));
-                Exception e = expectThrows(
-                    IllegalArgumentException.class,
-                    () -> agg.bucketComparator("garbage", randomFrom(SortOrder.values()))
-                );
-                assertThat(
-                    e.getMessage(),
-                    equalTo(
-                        "Ordering on a single-bucket aggregation can only be done on its doc_count. "
-                            + "Either drop the key (a la \"test\") or change it to \"doc_count\" (a la \"test.doc_count\") or \"key\"."
-                    )
-                );
-            }
-        }
-    }
-}