Browse Source

[Transform] add support for top metrics (#71850)

add support for the stats and top metrics aggregation in transform. With this change it became
easier to add more multi value aggregations to transform

Limitations:
 - only the 1st element of top_metrics gets consumed by transform[*].
 - all values of stats will be mapped to double if mapping deduction is used, including count,
   sum, min, max

fixes #52236
relates #51925
Hendrik Muhs 4 years ago
parent
commit
7fff5df7a3

+ 2 - 0
docs/reference/rest-api/common-parms.asciidoc

@@ -716,8 +716,10 @@ currently supported:
 * <<search-aggregations-metrics-percentile-aggregation,Percentiles>>
 * <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
 * <<search-aggregations-metrics-scripted-metric-aggregation,Scripted metric>>
+* <<search-aggregations-metrics-stats-aggregation,Stats>>
 * <<search-aggregations-metrics-sum-aggregation,Sum>>
 * <<search-aggregations-bucket-terms-aggregation, Terms>>
+* <<search-aggregations-metrics-top-metrics,Top Metrics>>
 * <<search-aggregations-metrics-valuecount-aggregation,Value count>>
 * <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
 

+ 51 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -1412,6 +1412,57 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertEquals(4.47169811, actual.doubleValue(), 0.000001);
     }
 
+    public void testPivotWithTopMetrics() throws Exception {
+        String transformId = "top_metrics_transform";
+        String transformIndex = "top_metrics_pivot_reviews";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
+
+        final Request createTransformRequest = createRequestWithAuth(
+            "PUT",
+            getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
+        );
+
+        String config = "{"
+            + " \"source\": {\"index\":\""
+            + REVIEWS_INDEX_NAME
+            + "\"},"
+            + " \"dest\": {\"index\":\""
+            + transformIndex
+            + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"top_business\": {"
+            + "       \"top_metrics\": {"
+            + "         \"metrics\": {\"field\": \"business_id\"},"
+            + "         \"sort\": {\"timestamp\": \"desc\"}"
+            + "} } } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
+        assertTrue(indexExists(transformIndex));
+
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
+        assertEquals("business_9", actual);
+
+        searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_1");
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
+        assertEquals("business_3", actual);
+    }
+
     public void testManyBucketsWithSmallPageSize() throws Exception {
         String transformId = "test_with_many_buckets";
         String transformIndex = transformId + "-idx";

+ 4 - 1
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -276,7 +276,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
             + "     \"affiliate_missing\": {"
             + "       \"missing\": {"
             + "         \"field\": \"affiliate_id\""
-
+            + " } },"
+            + "     \"stats\": {"
+            + "       \"stats\": {"
+            + "         \"field\": \"stars\""
             + " } } } },"
             + "\"frequency\":\"1s\""
             + "}";

+ 53 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

@@ -25,6 +25,8 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
 import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
 import org.elasticsearch.search.aggregations.metrics.GeoBounds;
 import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
+import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation;
+import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.MultiValue;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
 import org.elasticsearch.search.aggregations.metrics.Percentile;
 import org.elasticsearch.search.aggregations.metrics.Percentiles;
@@ -63,6 +65,8 @@ public final class AggregationResultUtils {
         tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
         tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
         tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
+        tempMap.put(MultiValue.class.getName(), new NumericMultiValueAggExtractor());
+        tempMap.put(MultiValueAggregation.class.getName(), new MultiValueAggExtractor());
         TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
     }
 
@@ -151,8 +155,14 @@ public final class AggregationResultUtils {
             return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
         } else if (aggregation instanceof GeoBounds) {
             return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
+            // note: percentiles is also a multi value agg, therefore check percentiles first
+            // TODO: can the Percentiles extractor be removed?
         } else if (aggregation instanceof Percentiles) {
             return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
+        } else if (aggregation instanceof MultiValue) {
+            return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValue.class.getName());
+        } else if (aggregation instanceof MultiValueAggregation) {
+            return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValueAggregation.class.getName());
         } else if (aggregation instanceof SingleBucketAggregation) {
             return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
         } else if (aggregation instanceof MultiBucketsAggregation) {
@@ -259,6 +269,44 @@ public final class AggregationResultUtils {
         }
     }
 
+    static class MultiValueAggExtractor implements AggValueExtractor {
+        @Override
+        public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
+            MultiValueAggregation aggregation = (MultiValueAggregation) agg;
+            Map<String, Object> extracted = new HashMap<>();
+            for (String valueName : aggregation.valueNames()) {
+                List<String> valueAsStrings = aggregation.getValuesAsStrings(valueName);
+
+                // todo: size > 1 is not supported, requires a refactoring so that `size()` is exposed in the agg builder
+                if (valueAsStrings.size() > 0) {
+                    extracted.put(valueName, valueAsStrings.get(0));
+                }
+            }
+
+            return extracted;
+        }
+    }
+
+    static class NumericMultiValueAggExtractor implements AggValueExtractor {
+        @Override
+        public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
+            MultiValue aggregation = (MultiValue) agg;
+            Map<String, Object> extracted = new HashMap<>();
+
+            String fieldLookupPrefix = (lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()) + ".";
+            for (String valueName : aggregation.valueNames()) {
+                double value = aggregation.value(valueName);
+
+                String fieldType = fieldTypeMap.get(fieldLookupPrefix + valueName);
+                if (Numbers.isValidDouble(value)) {
+                    extracted.put(valueName, dropFloatingPointComponentIfTypeRequiresIt(fieldType, value));
+                }
+            }
+
+            return extracted;
+        }
+    }
+
     static class PercentilesAggExtractor implements AggValueExtractor {
         @Override
         public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
@@ -402,12 +450,11 @@ public final class AggregationResultUtils {
 
         @Override
         public Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
-            assert aggregation instanceof GeoShapeMetricAggregation
-                 : "Unexpected type ["
-                        + aggregation.getClass().getName()
-                        + "] for aggregation ["
-                        + aggregation.getName()
-                        + "]";
+            assert aggregation instanceof GeoShapeMetricAggregation : "Unexpected type ["
+                + aggregation.getClass().getName()
+                + "] for aggregation ["
+                + aggregation.getName()
+                + "]";
             return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry();
         }
     }

+ 58 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -73,10 +74,8 @@ public final class TransformAggregations {
         "sampler",
         "significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
         "significant_text",
-        "stats", // https://github.com/elastic/elasticsearch/issues/51925
         "string_stats", // https://github.com/elastic/elasticsearch/issues/51925
         "top_hits",
-        "top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
         "t_test", // https://github.com/elastic/elasticsearch/issues/54503,
         "variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
         "rate", // https://github.com/elastic/elasticsearch/issues/61351
@@ -113,7 +112,9 @@ public final class TransformAggregations {
         FILTER("filter", LONG),
         TERMS("terms", FLATTENED),
         RARE_TERMS("rare_terms", FLATTENED),
-        MISSING("missing", LONG);
+        MISSING("missing", LONG),
+        TOP_METRICS("top_metrics", SOURCE),
+        STATS("stats", DOUBLE);
 
         private final String aggregationType;
         private final String targetMapping;
@@ -175,7 +176,32 @@ public final class TransformAggregations {
         return agg.getTargetMapping();
     }
 
+    /**
+     * Checks the aggregation object and returns a tuple with 2 maps:
+     *
+     * 1. mapping the name of the agg to the used field
+     * 2. mapping the name of the agg to the aggregation type
+     *
+     * Example:
+     * {
+     *   "my_agg": {
+     *     "max": {
+     *       "field": "my_field"
+     * }}}
+     *
+     * creates ({ "my_agg": "my_field" }, { "my_agg": "max" })
+     *
+     * Both mappings can contain _multiple_ entries, e.g. due to sub aggregations or because of aggregations creating multiple
+     * values(e.g. percentiles)
+     *
+     * Note about order: aggregation can hit in multiple places (e.g. a multi value agg implement {@link ValuesSourceAggregationBuilder})
+     * Be careful changing the order in this method
+     *
+     * @param agg the aggregation builder
+     * @return a tuple with 2 mappings that maps the used field(s) and aggregation type(s)
+     */
     public static Tuple<Map<String, String>, Map<String, String>> getAggregationInputAndOutputTypes(AggregationBuilder agg) {
+        // todo: can this be removed?
         if (agg instanceof PercentilesAggregationBuilder) {
             PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
 
@@ -185,7 +211,34 @@ public final class TransformAggregations {
                 Collections.emptyMap(),
                 Arrays.stream(percentilesAgg.percentiles())
                     .mapToObj(OutputFieldNameConverter::fromDouble)
-                    .collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1))
+                    .collect(
+                        Collectors.toMap(p -> percentilesAgg.getName() + "." + p, p -> { return percentilesAgg.getType(); }, (p1, p2) -> p1)
+                    )
+            );
+        }
+
+        // does the agg specify output field names
+        Optional<Set<String>> outputFieldNames = agg.getOutputFieldNames();
+        if (outputFieldNames.isPresent()) {
+            return new Tuple<>(
+                outputFieldNames.get()
+                    .stream()
+                    .collect(
+                        Collectors.toMap(
+                            outputField -> agg.getName() + "." + outputField,
+                            outputField -> outputField,
+                            (v1, v2) -> v1
+                        )
+                    ),
+                outputFieldNames.get()
+                    .stream()
+                    .collect(
+                        Collectors.toMap(
+                            outputField -> agg.getName() + "." + outputField,
+                            outputField -> agg.getType(),
+                            (v1, v2) -> v1
+                        )
+                    )
             );
         }
 
@@ -193,7 +246,7 @@ public final class TransformAggregations {
             ValuesSourceAggregationBuilder<?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?>) agg;
             return new Tuple<>(
                 Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()),
-                Collections.singletonMap(agg.getName(), agg.getType())
+                Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.getType())
             );
         }
 

+ 154 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
 
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ContextParser;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -20,6 +21,7 @@ import org.elasticsearch.script.Script;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
 import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
@@ -36,6 +38,8 @@ import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuild
 import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.GeoBounds;
 import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
+import org.elasticsearch.search.aggregations.metrics.InternalMultiValueAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
@@ -77,6 +81,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
@@ -116,6 +121,96 @@ public class AggregationResultUtilsTests extends ESTestCase {
             .collect(Collectors.toList());
     }
 
+    class TestMultiValueAggregation extends InternalMultiValueAggregation {
+
+        private final Map<String, String> values;
+
+        TestMultiValueAggregation(String name, Map<String, String> values) {
+            super(name, emptyMap());
+            this.values = values;
+        }
+
+        @Override
+        public String getWriteableName() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<String> getValuesAsStrings(String name) {
+            return Collections.singletonList(values.get(name).toString());
+        }
+
+        @Override
+        protected void doWriteTo(StreamOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterable<String> valueNames() {
+            return values.keySet();
+        }
+
+        @Override
+        protected boolean mustReduceOnSingleInternalAgg() {
+            return false;
+        }
+
+        @Override
+        public Object getProperty(List<String> path) {
+            return null;
+        }
+    }
+
+    class TestNumericMultiValueAggregation extends InternalNumericMetricsAggregation.MultiValue {
+
+        private final Map<String, Double> values;
+
+        TestNumericMultiValueAggregation(String name, Map<String, Double> values) {
+            super(name, emptyMap());
+            this.values = values;
+        }
+
+        @Override
+        public String getWriteableName() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public double value(String name) {
+            return values.get(name);
+        }
+
+        @Override
+        protected void doWriteTo(StreamOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterable<String> valueNames() {
+            return values.keySet();
+        }
+    }
+
     @Override
     protected NamedXContentRegistry xContentRegistry() {
         return namedXContentRegistry;
@@ -664,6 +759,65 @@ public class AggregationResultUtilsTests extends ESTestCase {
         );
     }
 
+    public void testMultiValueAggExtractor() {
+        Aggregation agg = new TestMultiValueAggregation("mv_metric", Collections.singletonMap("ip", "192.168.1.1"));
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("mv_metric.ip", "ip"), ""),
+            equalTo(Collections.singletonMap("ip", "192.168.1.1"))
+        );
+
+        agg = new TestMultiValueAggregation("mv_metric", Collections.singletonMap("top_answer", "fortytwo"));
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("mv_metric.written_answer", "written_answer"), ""),
+            equalTo(Collections.singletonMap("top_answer", "fortytwo"))
+        );
+
+        agg = new TestMultiValueAggregation("mv_metric", Map.of("ip", "192.168.1.1", "top_answer", "fortytwo"));
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg).value(agg, Map.of("mv_metric.top_answer", "keyword", "mv_metric.ip", "ip"), ""),
+            equalTo(Map.of("top_answer", "fortytwo", "ip", "192.168.1.1"))
+        );
+    }
+
+    public void testNumericMultiValueAggExtractor() {
+        Aggregation agg = new TestNumericMultiValueAggregation(
+            "mv_metric",
+            Collections.singletonMap("approx_answer", Double.valueOf(42.2))
+        );
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("mv_metric.approx_answer", "double"), ""),
+            equalTo(Collections.singletonMap("approx_answer", Double.valueOf(42.2)))
+        );
+
+        agg = new TestNumericMultiValueAggregation("mv_metric", Collections.singletonMap("exact_answer", Double.valueOf(42.0)));
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("mv_metric.exact_answer", "long"), ""),
+            equalTo(Collections.singletonMap("exact_answer", Long.valueOf(42)))
+        );
+
+        agg = new TestNumericMultiValueAggregation(
+            "mv_metric",
+            Map.of("approx_answer", Double.valueOf(42.2), "exact_answer", Double.valueOf(42.0))
+        );
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg)
+                .value(agg, Map.of("mv_metric.approx_answer", "double", "mv_metric.exact_answer", "long"), ""),
+            equalTo(Map.of("approx_answer", Double.valueOf(42.2), "exact_answer", Long.valueOf(42)))
+        );
+
+        assertThat(
+            AggregationResultUtils.getExtractor(agg)
+                .value(agg, Map.of("filter.mv_metric.approx_answer", "double", "filter.mv_metric.exact_answer", "long"), "filter"),
+            equalTo(Map.of("approx_answer", Double.valueOf(42.2), "exact_answer", Long.valueOf(42)))
+        );
+    }
+
     private ScriptedMetric createScriptedMetric(Object returnValue) {
         ScriptedMetric agg = mock(ScriptedMetric.class);
         when(agg.aggregation()).thenReturn(returnValue);

+ 14 - 12
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

@@ -48,6 +48,7 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -68,10 +69,14 @@ public class PivotTests extends ESTestCase {
     private NamedXContentRegistry namedXContentRegistry;
     private Client client;
 
+    // exclude aggregations from the analytics module as we don't have parser for it here
+    private final Set<String> externalAggregations = Collections.singleton("top_metrics");
+
     private final Set<String> supportedAggregations = Stream.of(AggregationType.values())
         .map(AggregationType::getName)
+        .filter(agg -> externalAggregations.contains(agg) == false)
         .collect(Collectors.toSet());
-    private final String[] unsupportedAggregations = { "stats" };
+    private final String[] unsupportedAggregations = { "global" };
 
     @Before
     public void registerAggregationNamedObjects() throws Exception {
@@ -148,11 +153,7 @@ public class PivotTests extends ESTestCase {
         for (String agg : supportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Function pivot = new Pivot(
-                getValidPivotConfig(aggregationConfig),
-                new SettingsConfig(),
-                Version.CURRENT
-            );
+            Function pivot = new Pivot(getValidPivotConfig(aggregationConfig), new SettingsConfig(), Version.CURRENT);
             assertValidTransform(client, source, pivot);
         }
     }
@@ -161,11 +162,7 @@ public class PivotTests extends ESTestCase {
         for (String agg : unsupportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Function pivot = new Pivot(
-                getValidPivotConfig(aggregationConfig),
-                new SettingsConfig(),
-                Version.CURRENT
-            );
+            Function pivot = new Pivot(getValidPivotConfig(aggregationConfig), new SettingsConfig(), Version.CURRENT);
 
             pivot.validateConfig(ActionListener.wrap(r -> { fail("expected an exception but got a response"); }, e -> {
                 assertThat(e, anyOf(instanceOf(ElasticsearchException.class)));
@@ -179,7 +176,7 @@ public class PivotTests extends ESTestCase {
             + "\"group-A\": { \"terms\": { \"field\": \"field-A\" } },"
             + "\"group-B\": { \"terms\": { \"field\": \"field-B\" } },"
             + "\"group-C\": { \"terms\": { \"field\": \"field-C\" } }"
-        + "}";
+            + "}";
         GroupConfig groupConfig;
         try (XContentParser parser = createParser(JsonXContent.jsonXContent, groupConfigJson)) {
             groupConfig = GroupConfig.fromXContent(parser, false);
@@ -311,6 +308,11 @@ public class PivotTests extends ESTestCase {
                 "{\"pivot_geo_line\": {\"geo_line\": {\"point\": {\"field\": \"values\"}, \"sort\":{\"field\": \"timestamp\"}}}}"
             );
         }
+        if (agg.equals("global")) {
+            return parseAggregations(
+                "{\"pivot_global\": {\"global\": {}}}"
+            );
+        }
 
         return parseAggregations(
             "{\n" + "  \"pivot_" + agg + "\": {\n" + "    \"" + agg + "\": {\n" + "      \"field\": \"values\"\n" + "    }\n" + "  }" + "}"

+ 26 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregationsTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin;
 import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
 
@@ -110,6 +111,16 @@ public class TransformAggregationsTests extends ESTestCase {
         assertEquals("flattened", TransformAggregations.resolveTargetMapping("rare_terms", "text"));
         assertEquals("flattened", TransformAggregations.resolveTargetMapping("rare_terms", "keyword"));
 
+        // top_metrics
+        assertEquals("int", TransformAggregations.resolveTargetMapping("top_metrics", "int"));
+        assertEquals("double", TransformAggregations.resolveTargetMapping("top_metrics", "double"));
+        assertEquals("ip", TransformAggregations.resolveTargetMapping("top_metrics", "ip"));
+        assertEquals("keyword", TransformAggregations.resolveTargetMapping("top_metrics", "keyword"));
+
+        // stats
+        assertEquals("double", TransformAggregations.resolveTargetMapping("stats", null));
+        assertEquals("double", TransformAggregations.resolveTargetMapping("stats", "int"));
+
         // corner case: source type null
         assertEquals(null, TransformAggregations.resolveTargetMapping("min", null));
     }
@@ -167,6 +178,21 @@ public class TransformAggregationsTests extends ESTestCase {
         assertEquals("percentiles", outputTypes.get("percentiles.10"));
     }
 
+    public void testGetAggregationOutputTypesStats() {
+        AggregationBuilder statsAggregationBuilder = new StatsAggregationBuilder("stats");
+
+        Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(
+            statsAggregationBuilder
+        );
+        Map<String, String> outputTypes = inputAndOutputTypes.v2();
+        assertEquals(5, outputTypes.size());
+        assertEquals("stats", outputTypes.get("stats.max"));
+        assertEquals("stats", outputTypes.get("stats.min"));
+        assertEquals("stats", outputTypes.get("stats.avg"));
+        assertEquals("stats", outputTypes.get("stats.count"));
+        assertEquals("stats", outputTypes.get("stats.sum"));
+    }
+
     public void testGetAggregationOutputTypesSubAggregations() {
 
         AggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter_1", new TermQueryBuilder("type", "cat"));