Ver Fonte

Update bucket metric pipeline agg paths to allow intermediate single bucket and bucket qualified multi-bucket aggs (#85729)

Bucket metric pipeline aggregation paths currently always expect the sibling aggregation to be a multi-bucket agg. 

This honestly doesn't have to be the case for bucket metric pipeline aggs to work. 

Consider the following path:

```
filter_agg>filters_agg['bucket_foo']>histo>some_metric_agg
```

Since `filter_agg>filters_agg['bucket_foo']` are well defined and are not crossing bucket threasholds, metrics should still be able to be calculated against the bucket values for `histo`

This commit allows any combination of single bucket aggs (e.g. filter) and bucket specific multi-bucket aggs before reaching the desired multi-bucket used for the metric calculation.
Benjamin Trent há 3 anos atrás
pai
commit
b4668ca868
14 ficheiros alterados com 248 adições e 35 exclusões
  1. 6 0
      docs/changelog/85729.yaml
  2. 30 4
      server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregationBuilder.java
  3. 71 4
      server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java
  4. 28 11
      server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java
  5. 97 0
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java
  6. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketTests.java
  7. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketTests.java
  8. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketTests.java
  9. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MinBucketTests.java
  10. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketTests.java
  11. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketTests.java
  12. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/pipeline/SumBucketTests.java
  13. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/correlation/BucketCorrelationAggregationBuilderTests.java
  14. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/kstest/BucketCountKSTestAggregationBuilderTests.java

+ 6 - 0
docs/changelog/85729.yaml

@@ -0,0 +1,6 @@
+pr: 85729
+summary: Update bucket metric pipeline agg paths to allow intermediate single bucket
+  and bucket qualified multi-bucket aggs
+area: Aggregations
+type: enhancement
+issues: []

+ 30 - 4
server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregationBuilder.java

@@ -17,6 +17,7 @@ import org.elasticsearch.search.aggregations.support.AggregationPath;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -105,20 +106,45 @@ public abstract class BucketMetricsPipelineAggregationBuilder<AF extends BucketM
             return;
         }
         // find the first agg name in the buckets path to check its a multi bucket agg
-        final String firstAgg = AggregationPath.parse(bucketsPaths[0]).getPathElementsAsStringList().get(0);
+        List<AggregationPath.PathElement> path = AggregationPath.parse(bucketsPaths[0]).getPathElements();
+        int pathPos = 0;
+        AggregationPath.PathElement currentAgg = path.get(pathPos++);
+        final String aggName = currentAgg.name();
         Optional<AggregationBuilder> aggBuilder = context.getSiblingAggregations()
             .stream()
-            .filter(builder -> builder.getName().equals(firstAgg))
+            .filter(builder -> builder.getName().equals(aggName))
             .findAny();
         if (aggBuilder.isEmpty()) {
             context.addBucketPathValidationError("aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
             return;
         }
+
+        // Dig through the aggregation tree to find the first aggregation specified by the path.
+        // The path may have many single bucket aggs (with sub-aggs) or many multi-bucket aggs specified by bucket keys
+        while (aggBuilder.isPresent()
+            && pathPos < path.size()
+            && ((aggBuilder.get().bucketCardinality() == AggregationBuilder.BucketCardinality.MANY
+                && AggregationPath.pathElementContainsBucketKey(currentAgg))
+                || (aggBuilder.get().bucketCardinality() == AggregationBuilder.BucketCardinality.ONE
+                    && aggBuilder.get().getSubAggregations().isEmpty() == false))) {
+            currentAgg = path.get(pathPos++);
+            final String subAggName = currentAgg.name();
+            aggBuilder = aggBuilder.get().getSubAggregations().stream().filter(b -> b.getName().equals(subAggName)).findAny();
+        }
+        if (aggBuilder.isEmpty()) {
+            context.addBucketPathValidationError(
+                "aggregation does not exist for aggregation ["
+                    + name
+                    + "]: "
+                    + AggregationPath.pathElementsAsStringList(path.subList(0, pathPos))
+            );
+            return;
+        }
         if (aggBuilder.get().bucketCardinality() != AggregationBuilder.BucketCardinality.MANY) {
             context.addValidationError(
-                "The first aggregation in "
+                "Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation ["
+                    + ". Path must include a multi-bucket aggregation for aggregation ["
                     + name
                     + "] found :"
                     + aggBuilder.get().getClass().getName()

+ 71 - 4
server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java

@@ -10,16 +10,20 @@ package org.elasticsearch.search.aggregations.pipeline;
 
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.support.AggregationPath;
 
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.search.aggregations.support.AggregationPath.pathElementContainsBucketKey;
+
 /**
  * A class of sibling pipeline aggregations which calculate metrics across the
  * buckets of a sibling aggregation
@@ -44,11 +48,74 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
     @Override
     public final InternalAggregation doReduce(Aggregations aggregations, AggregationReduceContext context) {
         preCollection();
-        List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
+        List<AggregationPath.PathElement> parsedPath = AggregationPath.parse(bucketsPaths()[0]).getPathElements();
         for (Aggregation aggregation : aggregations) {
-            if (aggregation.getName().equals(bucketsPath.get(0))) {
-                List<String> sublistedPath = bucketsPath.subList(1, bucketsPath.size());
-                InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) aggregation;
+            // Now that we have found the first agg in the path, resolve to the first non-qualified multi-bucket path
+            if (aggregation.getName().equals(parsedPath.get(0).name())) {
+                int currElement = 0;
+                Aggregation currentAgg = aggregation;
+                while (currElement < parsedPath.size() - 1) {
+                    if (currentAgg == null) {
+                        throw new IllegalArgumentException(
+                            "bucket_path ["
+                                + bucketsPaths()[0]
+                                + "] expected aggregation with name ["
+                                + parsedPath.get(currElement).name()
+                                + "] but was missing in search response"
+                        );
+                    }
+                    if (currentAgg instanceof InternalSingleBucketAggregation singleBucketAggregation) {
+                        currentAgg = singleBucketAggregation.getAggregations().get(parsedPath.get(++currElement).name());
+                    } else if (pathElementContainsBucketKey(parsedPath.get(currElement))) {
+                        if (currentAgg instanceof InternalMultiBucketAggregation<?, ?> multiBucketAggregation) {
+                            InternalMultiBucketAggregation.InternalBucket bucket =
+                                (InternalMultiBucketAggregation.InternalBucket) multiBucketAggregation.getProperty(
+                                    parsedPath.get(currElement).key()
+                                );
+                            if (bucket == null) {
+                                throw new AggregationExecutionException(
+                                    "missing bucket ["
+                                        + parsedPath.get(currElement).key()
+                                        + "] for agg ["
+                                        + currentAgg.getName()
+                                        + "] while extracting bucket path ["
+                                        + bucketsPaths()[0]
+                                        + "]"
+                                );
+                            }
+                            if (currElement == parsedPath.size() - 1) {
+                                throw new AggregationExecutionException(
+                                    "invalid bucket path ends at [" + parsedPath.get(currElement).key() + "]"
+                                );
+                            }
+                            currentAgg = bucket.getAggregations().get(parsedPath.get(++currElement).name());
+                        } else {
+                            throw new AggregationExecutionException(
+                                "bucket_path ["
+                                    + bucketsPaths()[0]
+                                    + "] indicates bucket_key ["
+                                    + parsedPath.get(currElement).key()
+                                    + "] at position ["
+                                    + currElement
+                                    + "] but encountered on agg ["
+                                    + currentAgg.getName()
+                                    + "] which is not a multi_bucket aggregation"
+                            );
+                        }
+                    } else {
+                        break;
+                    }
+                }
+                if (currentAgg instanceof InternalMultiBucketAggregation == false) {
+                    String msg = currentAgg == null
+                        ? "did not find multi-bucket aggregation for extraction."
+                        : "did not find multi-bucket aggregation for extraction. Found [" + currentAgg.getName() + "]";
+                    throw new AggregationExecutionException(msg);
+                }
+                List<String> sublistedPath = AggregationPath.pathElementsAsStringList(parsedPath.subList(currElement, parsedPath.size()));
+                // First element is the current agg, so we want the rest of the path
+                sublistedPath = sublistedPath.subList(1, sublistedPath.size());
+                InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) currentAgg;
                 List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = multiBucketsAgg.getBuckets();
                 for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
                     Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, sublistedPath, gapPolicy);

+ 28 - 11
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java

@@ -68,6 +68,33 @@ public class AggregationPath {
 
     private static final String AGG_DELIM = ">";
 
+    /**
+     * Indicates if the current path element contains a bucket key.
+     *
+     * InternalMultiBucketAggregation#resolvePropertyFromPath supports resolving specific buckets and a bucket is indicated by
+     * wrapping a key element in quotations. Example `agg['foo']` would get the bucket `foo` in the agg.
+     *
+     * @param pathElement The path element to check
+     * @return Does the path element contain a bucket_key or not
+     */
+    public static boolean pathElementContainsBucketKey(AggregationPath.PathElement pathElement) {
+        return pathElement != null && pathElement.key() != null && pathElement.key().startsWith("'") && pathElement.key().endsWith("'");
+    }
+
+    public static List<String> pathElementsAsStringList(List<PathElement> pathElements) {
+        List<String> stringPathElements = new ArrayList<>();
+        for (PathElement pathElement : pathElements) {
+            stringPathElements.add(pathElement.name);
+            if (pathElement.key != null) {
+                stringPathElements.add(pathElement.key);
+            }
+            if (pathElement.metric != null) {
+                stringPathElements.add(pathElement.metric);
+            }
+        }
+        return stringPathElements;
+    }
+
     public static AggregationPath parse(String path) {
         String[] elements = Strings.tokenizeToStringArray(path, AGG_DELIM);
         List<PathElement> tokens = new ArrayList<>(elements.length);
@@ -181,17 +208,7 @@ public class AggregationPath {
     }
 
     public List<String> getPathElementsAsStringList() {
-        List<String> stringPathElements = new ArrayList<>();
-        for (PathElement pathElement : this.pathElements) {
-            stringPathElements.add(pathElement.name);
-            if (pathElement.key != null) {
-                stringPathElements.add(pathElement.key);
-            }
-            if (pathElement.metric != null) {
-                stringPathElements.add(pathElement.metric);
-            }
-        }
-        return stringPathElements;
+        return pathElementsAsStringList(this.pathElements);
     }
 
     /**

+ 97 - 0
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.pipeline;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
@@ -17,17 +18,24 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
+import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.time.DateFormatters;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
+import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.InternalAvg;
 
@@ -119,6 +127,95 @@ public class AvgBucketAggregatorTests extends AggregatorTestCase {
         }
     }
 
+    public void testComplicatedBucketPath() throws IOException {
+        Query query = new MatchAllDocsQuery();
+        final String textField = "text";
+        AvgAggregationBuilder avgBuilder = new AvgAggregationBuilder("foo").field(VALUE_FIELD);
+        DateHistogramAggregationBuilder histo = new DateHistogramAggregationBuilder("histo").calendarInterval(DateHistogramInterval.YEAR)
+            .field(DATE_FIELD)
+            .subAggregation(new AvgAggregationBuilder("foo").field(VALUE_FIELD));
+        TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field(textField).subAggregation(histo);
+        FilterAggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter", QueryBuilders.matchAllQuery())
+            .subAggregation(termsBuilder);
+        AvgBucketPipelineAggregationBuilder avgBucketBuilder = new AvgBucketPipelineAggregationBuilder(
+            "the_avg_bucket",
+            "filter>terms['value']>histo>foo"
+        );
+
+        try (Directory directory = newDirectory()) {
+            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
+                Document document = new Document();
+                for (String date : dataset) {
+                    if (frequently()) {
+                        indexWriter.commit();
+                    }
+
+                    document.add(new SortedNumericDocValuesField(DATE_FIELD, asLong(date)));
+                    document.add(new SortedNumericDocValuesField(VALUE_FIELD, randomInt()));
+                    document.add(new SortedSetDocValuesField(textField, new BytesRef("value")));
+                    document.add(
+                        new KeywordFieldMapper.KeywordField(textField, new BytesRef("value"), KeywordFieldMapper.Defaults.FIELD_TYPE)
+                    );
+                    indexWriter.addDocument(document);
+                    document.clear();
+                }
+            }
+
+            InternalAvg avgResult;
+            InternalDateHistogram histogramResult;
+            InternalFilter filterResult;
+            InternalTerms<?, ?> internalTerms;
+            try (IndexReader indexReader = DirectoryReader.open(directory)) {
+                IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
+
+                DateFieldMapper.DateFieldType fieldType = new DateFieldMapper.DateFieldType(DATE_FIELD);
+                MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD, NumberFieldMapper.NumberType.LONG);
+                MappedFieldType keywordField = keywordField(textField);
+
+                avgResult = searchAndReduce(
+                    indexSearcher,
+                    query,
+                    avgBuilder,
+                    10000,
+                    new MappedFieldType[] { fieldType, valueFieldType, keywordField }
+                );
+                histogramResult = searchAndReduce(
+                    indexSearcher,
+                    query,
+                    histo,
+                    10000,
+                    new MappedFieldType[] { fieldType, valueFieldType, keywordField }
+                );
+                internalTerms = searchAndReduce(
+                    indexSearcher,
+                    query,
+                    termsBuilder,
+                    10000,
+                    new MappedFieldType[] { fieldType, valueFieldType, keywordField }
+                );
+                filterResult = searchAndReduce(
+                    indexSearcher,
+                    query,
+                    filterAggregationBuilder,
+                    10000,
+                    new MappedFieldType[] { fieldType, valueFieldType, keywordField }
+                );
+            }
+
+            // Finally, reduce the pipeline agg
+            PipelineAggregator avgBucketAgg = avgBucketBuilder.createInternal(Collections.emptyMap());
+            List<Aggregation> reducedAggs = new ArrayList<>(4);
+
+            reducedAggs.add(filterResult);
+            reducedAggs.add(internalTerms);
+            reducedAggs.add(histogramResult);
+            reducedAggs.add(avgResult);
+            Aggregations aggregations = new Aggregations(reducedAggs);
+            InternalAggregation pipelineResult = ((AvgBucketPipelineAggregator) avgBucketAgg).doReduce(aggregations, null);
+            assertNotNull(pipelineResult);
+        }
+    }
+
     private static long asLong(String dateTime) {
         return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
     }

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketTests.java

@@ -47,9 +47,9 @@ public class AvgBucketTests extends AbstractBucketMetricsTestCase<AvgBucketPipel
         assertThat(
             validate(aggBuilders, new AvgBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketTests.java

@@ -71,9 +71,9 @@ public class ExtendedStatsBucketTests extends AbstractBucketMetricsTestCase<Exte
         assertThat(
             validate(aggBuilders, new ExtendedStatsBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketTests.java

@@ -47,9 +47,9 @@ public class MaxBucketTests extends AbstractBucketMetricsTestCase<MaxBucketPipel
         assertThat(
             validate(aggBuilders, new MaxBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MinBucketTests.java

@@ -47,9 +47,9 @@ public class MinBucketTests extends AbstractBucketMetricsTestCase<MinBucketPipel
         assertThat(
             validate(aggBuilders, new MinBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketTests.java

@@ -74,9 +74,9 @@ public class PercentilesBucketTests extends AbstractBucketMetricsTestCase<Percen
         assertThat(
             validate(aggBuilders, new PercentilesBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketTests.java

@@ -47,9 +47,9 @@ public class StatsBucketTests extends AbstractBucketMetricsTestCase<StatsBucketP
         assertThat(
             validate(aggBuilders, new StatsBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/SumBucketTests.java

@@ -47,9 +47,9 @@ public class SumBucketTests extends AbstractBucketMetricsTestCase<SumBucketPipel
         assertThat(
             validate(aggBuilders, new SumBucketPipelineAggregationBuilder("name", "global>metric")),
             equalTo(
-                "Validation Failed: 1: The first aggregation in "
+                "Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
                     + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
-                    + " must be a multi-bucket aggregation for aggregation [name] found :"
+                    + ". Path must include a multi-bucket aggregation for aggregation [name] found :"
                     + GlobalAggregationBuilder.class.getName()
                     + " for buckets path: global>metric;"
             )

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/correlation/BucketCorrelationAggregationBuilderTests.java

@@ -80,7 +80,7 @@ public class BucketCorrelationAggregationBuilderTests extends BasePipelineAggreg
                     new CountCorrelationFunction(CountCorrelationIndicatorTests.randomInstance())
                 )
             ),
-            containsString("must be a multi-bucket aggregation for aggregation")
+            containsString("Unable to find unqualified multi-bucket aggregation in buckets_path")
         );
     }
 

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/kstest/BucketCountKSTestAggregationBuilderTests.java

@@ -89,7 +89,7 @@ public class BucketCountKSTestAggregationBuilderTests extends BasePipelineAggreg
                     new SamplingMethod.UpperTail()
                 )
             ),
-            containsString("must be a multi-bucket aggregation for aggregation")
+            containsString("Unable to find unqualified multi-bucket aggregation in buckets_path")
         );
     }