浏览代码

Disable filter by filter optimization when agg context requires in sort order execution (#91702)

Before this change `TimeSeriesIndexSearcher` caused a
`CollectionTerminatedException` because
`FilterByFilterAggregation#getLeafCollector()` always returns a
`LeafBucketCollector.NO_OP_COLLECTOR` instance. And
`TimeSeriesIndexSearcher` can't deal with this when initializing its
leaf walkers.

Maybe there is a way to also have the filter by filter optimization with
`TimeSeriesIndexSearcher`. The tricky part is that filter by filter
optimization executes when getting the reference to the leaf bucket
collector. By then the `TimeSeriesIndexSearcher` hasn't initialized the
other leafs and the tsid accounting.

This commit ensure that it is at least possible to run aggregations that
use the filter by filter optimization and have time series agg as sub
agg.

This commit also: * Removes unused code in FilterByFilterAggregator *
and also increment segmentsWithDeletedDocs field when there is a segment
with live docs.
Martijn van Groningen 2 年之前
父节点
当前提交
6b7e8ffeaf

+ 36 - 0
modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.aggregations.bucket.timeseries;
 
 
 import org.apache.lucene.document.DoubleDocValuesField;
 import org.apache.lucene.document.DoubleDocValuesField;
 import org.apache.lucene.document.FloatDocValuesField;
 import org.apache.lucene.document.FloatDocValuesField;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.document.SortedNumericDocValuesField;
 import org.apache.lucene.document.SortedNumericDocValuesField;
@@ -26,6 +27,9 @@ import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
 import org.elasticsearch.search.aggregations.metrics.Sum;
 import org.elasticsearch.search.aggregations.metrics.Sum;
 import org.elasticsearch.search.aggregations.support.ValuesSourceType;
 import org.elasticsearch.search.aggregations.support.ValuesSourceType;
 
 
@@ -77,6 +81,7 @@ public class TimeSeriesAggregatorTests extends AggregationTestCase {
     public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
     public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
         final List<IndexableField> fields = new ArrayList<>();
         final List<IndexableField> fields = new ArrayList<>();
         fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
         fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
+        fields.add(new LongPoint(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
         final TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder(null);
         final TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder(null);
         for (int i = 0; i < dimensions.length; i += 2) {
         for (int i = 0; i < dimensions.length; i += 2) {
             if (dimensions[i + 1]instanceof Number n) {
             if (dimensions[i + 1]instanceof Number n) {
@@ -99,6 +104,37 @@ public class TimeSeriesAggregatorTests extends AggregationTestCase {
         iw.addDocument(fields);
         iw.addDocument(fields);
     }
     }
 
 
+    public void testWithDateHistogramExecutedAsFilterByFilterWithTimeSeriesIndexSearcher() throws IOException {
+        DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("by_timestamp").field("@timestamp")
+            .fixedInterval(DateHistogramInterval.HOUR)
+            .subAggregation(new TimeSeriesAggregationBuilder("ts").subAggregation(sum("sum").field("val1")));
+
+        // Before this threw a CollectionTerminatedException because FilterByFilterAggregation#getLeafCollector() always returns a
+        // LeafBucketCollector.NO_OP_COLLECTOR instance. And TimeSeriesIndexSearcher can't deal with this when initializing the
+        // leaf walkers.
+        testCase(iw -> {
+            long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
+            for (int i = 1; i <= 5000; i++) {
+                writeTS(iw, startTime++, new Object[] { "dim1", "aaa" }, new Object[] { "val1", 1 });
+            }
+        }, internalAggregation -> {
+            InternalDateHistogram dateHistogram = (InternalDateHistogram) internalAggregation;
+            assertThat(dateHistogram.getBuckets(), hasSize(1));
+            InternalTimeSeries timeSeries = dateHistogram.getBuckets().get(0).getAggregations().get("ts");
+            assertThat(timeSeries.getBuckets(), hasSize(1));
+            Sum sum = timeSeries.getBuckets().get(0).getAggregations().get("sum");
+            assertThat(sum.value(), equalTo(5000.0));
+        },
+            new AggTestConfig(
+                aggregationBuilder,
+                TimeSeriesIdFieldMapper.FIELD_TYPE,
+                new DateFieldMapper.DateFieldType("@timestamp"),
+                new KeywordFieldMapper.KeywordFieldType("dim1"),
+                new NumberFieldMapper.NumberFieldType("val1", NumberFieldMapper.NumberType.INTEGER)
+            ).withQuery(new MatchAllDocsQuery())
+        );
+    }
+
     private void timeSeriesTestCase(
     private void timeSeriesTestCase(
         TimeSeriesAggregationBuilder builder,
         TimeSeriesAggregationBuilder builder,
         Query query,
         Query query,

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java

@@ -46,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSear
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.notNullValue;
 
 
 @ESIntegTestCase.SuiteScopeTestCase
 @ESIntegTestCase.SuiteScopeTestCase
@@ -690,7 +691,7 @@ public class AggregationProfilerIT extends ESIntegTestCase {
                             .entry("delegate", "FilterByFilterAggregator")
                             .entry("delegate", "FilterByFilterAggregator")
                             .entry(
                             .entry(
                                 "delegate_debug",
                                 "delegate_debug",
-                                matchesMap().entry("segments_with_deleted_docs", 0)
+                                matchesMap().entry("segments_with_deleted_docs", greaterThanOrEqualTo(0))
                                     .entry("segments_with_doc_count_field", 0)
                                     .entry("segments_with_doc_count_field", 0)
                                     .entry("segments_counted", 0)
                                     .entry("segments_counted", 0)
                                     .entry("segments_collected", greaterThan(0))
                                     .entry("segments_collected", greaterThan(0))

+ 4 - 23
server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterByFilterAggregator.java

@@ -13,7 +13,6 @@ import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.Bits;
-import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.search.aggregations.AdaptingAggregator;
 import org.elasticsearch.search.aggregations.AdaptingAggregator;
 import org.elasticsearch.search.aggregations.AggregationExecutionContext;
 import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -70,7 +69,7 @@ public class FilterByFilterAggregator extends FiltersAggregator {
             this.cardinality = cardinality;
             this.cardinality = cardinality;
             this.metadata = metadata;
             this.metadata = metadata;
             this.rewrittenTopLevelQuery = aggCtx.searcher().rewrite(aggCtx.query());
             this.rewrittenTopLevelQuery = aggCtx.searcher().rewrite(aggCtx.query());
-            this.valid = parent == null && otherBucketKey == null;
+            this.valid = parent == null && otherBucketKey == null && aggCtx.isInSortOrderExecutionRequired() == false;
         }
         }
 
 
         /**
         /**
@@ -224,6 +223,9 @@ public class FilterByFilterAggregator extends FiltersAggregator {
             return LeafBucketCollector.NO_OP_COLLECTOR;
             return LeafBucketCollector.NO_OP_COLLECTOR;
         }
         }
         Bits live = aggCtx.getLeafReaderContext().reader().getLiveDocs();
         Bits live = aggCtx.getLeafReaderContext().reader().getLiveDocs();
+        if (live != null) {
+            segmentsWithDeletedDocs++;
+        }
         if (false == docCountProvider.alwaysOne()) {
         if (false == docCountProvider.alwaysOne()) {
             segmentsWithDocCountField++;
             segmentsWithDocCountField++;
         }
         }
@@ -304,25 +306,4 @@ public class FilterByFilterAggregator extends FiltersAggregator {
         add.accept("segments_with_doc_count_field", segmentsWithDocCountField);
         add.accept("segments_with_doc_count_field", segmentsWithDocCountField);
     }
     }
 
 
-    CheckedSupplier<Boolean, IOException> canUseMetadata(LeafReaderContext ctx) {
-        return new CheckedSupplier<Boolean, IOException>() {
-            Boolean canUse;
-
-            @Override
-            public Boolean get() throws IOException {
-                if (canUse == null) {
-                    canUse = canUse();
-                }
-                return canUse;
-            }
-
-            private boolean canUse() throws IOException {
-                if (ctx.reader().getLiveDocs() != null) {
-                    return false;
-                }
-                docCountProvider.setLeafReaderContext(ctx);
-                return docCountProvider.alwaysOne();
-            }
-        };
-    }
 }
 }

+ 21 - 3
server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java

@@ -665,6 +665,12 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                     LongPoint.newRangeQuery("t", 5, Long.MAX_VALUE)
                     LongPoint.newRangeQuery("t", 5, Long.MAX_VALUE)
                 );
                 );
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
+                int segmentsWithLiveDocs = (int) searcher.getIndexReader()
+                    .leaves()
+                    .stream()
+                    .map(LeafReaderContext::reader)
+                    .filter(leafReader -> leafReader.getLiveDocs() != null)
+                    .count();
                 debugTestCase(
                 debugTestCase(
                     builder,
                     builder,
                     new MatchAllDocsQuery(),
                     new MatchAllDocsQuery(),
@@ -679,7 +685,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                     .entry("segments_collected", 0)
                                     .entry("segments_collected", 0)
                                     .entry("segments_with_doc_count_field", 0)
                                     .entry("segments_with_doc_count_field", 0)
-                                    .entry("segments_with_deleted_docs", 0)
+                                    .entry("segments_with_deleted_docs", segmentsWithLiveDocs)
                                     .entry(
                                     .entry(
                                         "filters",
                                         "filters",
                                         matchesList().item(
                                         matchesList().item(
@@ -730,6 +736,12 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                     LongPoint.newRangeQuery("t", 5, Long.MAX_VALUE)
                     LongPoint.newRangeQuery("t", 5, Long.MAX_VALUE)
                 );
                 );
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
+                int segmentsWithLiveDocs = (int) searcher.getIndexReader()
+                    .leaves()
+                    .stream()
+                    .map(LeafReaderContext::reader)
+                    .filter(leafReader -> leafReader.getLiveDocs() != null)
+                    .count();
                 debugTestCase(
                 debugTestCase(
                     builder,
                     builder,
                     new MatchAllDocsQuery(),
                     new MatchAllDocsQuery(),
@@ -744,7 +756,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                     .entry("segments_collected", 0)
                                     .entry("segments_collected", 0)
                                     .entry("segments_with_doc_count_field", 0)
                                     .entry("segments_with_doc_count_field", 0)
-                                    .entry("segments_with_deleted_docs", 0)
+                                    .entry("segments_with_deleted_docs", segmentsWithLiveDocs)
                                     .entry(
                                     .entry(
                                         "filters",
                                         "filters",
                                         matchesList().item(
                                         matchesList().item(
@@ -792,6 +804,12 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                     LongPoint.newRangeQuery("t", Long.MIN_VALUE, Long.MAX_VALUE)
                     LongPoint.newRangeQuery("t", Long.MIN_VALUE, Long.MAX_VALUE)
                 );
                 );
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
                 IndexSearcher searcher = newIndexSearcher(limitedReader);
+                int segmentsWithLiveDocs = (int) searcher.getIndexReader()
+                    .leaves()
+                    .stream()
+                    .map(LeafReaderContext::reader)
+                    .filter(leafReader -> leafReader.getLiveDocs() != null)
+                    .count();
 
 
                 debugTestCase(
                 debugTestCase(
                     builder,
                     builder,
@@ -807,7 +825,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                 matchesMap().entry("segments_counted", greaterThanOrEqualTo(1))
                                     .entry("segments_collected", 0)
                                     .entry("segments_collected", 0)
                                     .entry("segments_with_doc_count_field", 0)
                                     .entry("segments_with_doc_count_field", 0)
-                                    .entry("segments_with_deleted_docs", 0)
+                                    .entry("segments_with_deleted_docs", segmentsWithLiveDocs)
                                     .entry(
                                     .entry(
                                         "filters",
                                         "filters",
                                         matchesList().item(
                                         matchesList().item(