Ver código fonte

Move top-level pipeline aggs out of QuerySearchResult (#40319)

As part of #40177 we have added top-level pipeline aggs to
`InternalAggregations`. Given that `QuerySearchResult` holds an
`InternalAggregations` instance, there is no need to keep on setting
top-level pipeline aggs separately. Top-level pipeline aggs can then
always be transported through `InternalAggregations`. Such change is
made in a backwards compatible manner.
Luca Cavanna 6 anos atrás
pai
commit
b4d88aad3a

+ 2 - 2
build.gradle

@@ -162,8 +162,8 @@ task verifyVersions {
  * after the backport of the backcompat code is complete.
  */
 
-boolean bwc_tests_enabled = true
-final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
+boolean bwc_tests_enabled = false
+final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40319" /* place a PR link here when committing bwc changes */
 if (bwc_tests_enabled == false) {
   if (bwc_tests_disabled_issue.isEmpty()) {
     throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

@@ -486,7 +486,7 @@ public final class SearchPhaseController {
         }
         ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
         final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
-            InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
+            InternalAggregations.reduce(aggregationsList, reduceContext);
         final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
         final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
             reducedCompletionSuggestions);

+ 1 - 2
server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

@@ -132,7 +132,6 @@ public class AggregationPhase implements SearchPhase {
                 throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
             }
         }
-        context.queryResult().aggregations(new InternalAggregations(aggregations));
         List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
         List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
         for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
@@ -144,7 +143,7 @@ public class AggregationPhase implements SearchPhase {
                     + "allowed at the top level");
             }
         }
-        context.queryResult().pipelineAggregators(siblingPipelineAggregators);
+        context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
 
         // disable aggregations so that they don't run on next pages in case of scrolling
         context.aggregations(null);

+ 2 - 15
server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

@@ -77,7 +77,7 @@ public final class InternalAggregations extends Aggregations implements Streamab
      * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
      * become part of the list of {@link InternalAggregation}s.
      */
-    List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
+    public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
         return topLevelPipelineAggregators;
     }
 
@@ -91,20 +91,7 @@ public final class InternalAggregations extends Aggregations implements Streamab
         if (aggregationsList.isEmpty()) {
             return null;
         }
-        InternalAggregations first = aggregationsList.get(0);
-        return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
-    }
-
-    /**
-     * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
-     * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
-     */
-    public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
-                                              List<SiblingPipelineAggregator> topLevelPipelineAggregators,
-                                              ReduceContext context) {
-        if (aggregationsList.isEmpty()) {
-            return null;
-        }
+        List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
 
         // first we collect all aggregations of the same type and list them together
         Map<String, List<InternalAggregation>> aggByName = new HashMap<>();

+ 24 - 13
server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

@@ -21,6 +21,7 @@ package org.elasticsearch.search.query;
 
 import org.apache.lucene.search.FieldDoc;
 import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
@@ -28,6 +29,7 @@ import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -37,7 +39,6 @@ import org.elasticsearch.search.suggest.Suggest;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
@@ -54,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
     private DocValueFormat[] sortValueFormats;
     private InternalAggregations aggregations;
     private boolean hasAggs;
-    private List<SiblingPipelineAggregator> pipelineAggregators = Collections.emptyList();
     private Suggest suggest;
     private boolean searchTimedOut;
     private Boolean terminatedEarly = null;
@@ -198,14 +198,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
         hasProfileResults = shardResults != null;
     }
 
-    public List<SiblingPipelineAggregator> pipelineAggregators() {
-        return pipelineAggregators;
-    }
-
-    public void pipelineAggregators(List<SiblingPipelineAggregator> pipelineAggregators) {
-        this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators);
-    }
-
     public Suggest suggest() {
         return suggest;
     }
@@ -294,8 +286,18 @@ public final class QuerySearchResult extends SearchPhaseResult {
         if (hasAggs = in.readBoolean()) {
             aggregations = InternalAggregations.readAggregations(in);
         }
-        pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
-                .collect(Collectors.toList());
+        if (in.getVersion().before(Version.V_7_1_0)) {
+            List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
+                .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
+            if (hasAggs && pipelineAggregators.isEmpty() == false) {
+                List<InternalAggregation> internalAggs = aggregations.asList().stream()
+                    .map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
+                //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
+                //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
+                //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
+                this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
+            }
+        }
         if (in.readBoolean()) {
             suggest = new Suggest(in);
         }
@@ -332,7 +334,16 @@ public final class QuerySearchResult extends SearchPhaseResult {
             out.writeBoolean(true);
             aggregations.writeTo(out);
         }
-        out.writeNamedWriteableList(pipelineAggregators);
+        if (out.getVersion().before(Version.V_7_1_0)) {
+            //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
+            //while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
+            //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
+            if (aggregations == null) {
+                out.writeNamedWriteableList(Collections.emptyList());
+            } else {
+                out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
+            }
+        }
         if (suggest == null) {
             out.writeBoolean(false);
         } else {

+ 13 - 8
server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

@@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase {
     public void testReduceEmptyAggs() {
         List<InternalAggregations> aggs = Collections.emptyList();
         InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean());
-        assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext));
+        assertNull(InternalAggregations.reduce(aggs, reduceContext));
     }
 
     public void testNonFinalReduceTopLevelPipelineAggs()  {
         InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
             10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
-        List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms)));
         List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
         MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
         topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
+        List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
+            topLevelPipelineAggs));
         InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
-        InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext);
+        InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
         assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
         assertEquals(1, reducedAggs.aggregations.size());
     }
@@ -79,15 +80,15 @@ public class InternalAggregationsTests extends ESTestCase {
                 Collections.singletonList(siblingPipelineAggregator));
             reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
         } else {
-            InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
-            List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator);
-            reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext);
+            InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
+                Collections.singletonList(siblingPipelineAggregator));
+            reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
         }
         assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
         assertEquals(2, reducedAggs.aggregations.size());
     }
 
-    public void testSerialization() throws Exception {
+    public static InternalAggregations createTestInstance() throws Exception {
         List<InternalAggregation> aggsList = new ArrayList<>();
         if (randomBoolean()) {
             StringTermsTests stringTermsTests = new StringTermsTests();
@@ -116,7 +117,11 @@ public class InternalAggregationsTests extends ESTestCase {
                 topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
             }
         }
-        InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs);
+        return new InternalAggregations(aggsList, topLevelPipelineAggs);
+    }
+
+    public void testSerialization() throws Exception {
+        InternalAggregations aggregations = createTestInstance();
         writeToAndReadFrom(aggregations, 0);
     }
 

+ 102 - 0
server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java

@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.query;
+
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalAggregationsTests;
+import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
+import org.elasticsearch.search.suggest.SuggestTests;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
+
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+
+public class QuerySearchResultTests extends ESTestCase {
+
+    private final NamedWriteableRegistry namedWriteableRegistry;
+
+    public QuerySearchResultTests() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
+        this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
+    }
+
+    private static QuerySearchResult createTestInstance() throws Exception {
+        ShardId shardId = new ShardId("index", "uuid", randomInt());
+        QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
+        if (randomBoolean()) {
+            result.terminatedEarly(randomBoolean());
+        }
+        TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
+        result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);
+        result.size(randomInt());
+        result.from(randomInt());
+        if (randomBoolean()) {
+            result.suggest(SuggestTests.createTestItem());
+        }
+        if (randomBoolean()) {
+            result.aggregations(InternalAggregationsTests.createTestInstance());
+        }
+        return result;
+    }
+
+    public void testSerialization() throws Exception {
+        QuerySearchResult querySearchResult = createTestInstance();
+        Version version = VersionUtils.randomVersion(random());
+        QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
+        assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
+        assertNull(deserialized.getSearchShardTarget());
+        assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
+        assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
+        assertEquals(querySearchResult.from(), deserialized.from());
+        assertEquals(querySearchResult.size(), deserialized.size());
+        assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
+        if (deserialized.hasAggs()) {
+            Aggregations aggs = querySearchResult.consumeAggs();
+            Aggregations deserializedAggs = deserialized.consumeAggs();
+            assertEquals(aggs.asList(), deserializedAggs.asList());
+            List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
+            List<SiblingPipelineAggregator> deserializedPipelineAggs =
+                ((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
+            assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
+            for (int i = 0; i < pipelineAggs.size(); i++) {
+                SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
+                SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
+                assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
+                assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
+            }
+        }
+        assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
+    }
+}