Ver Fonte

Support global ords in top_metrics (#64967)

Adds support for fetching `keyword` and `ip` fields in `top_metrics`.
This only works if we can get global ordinals for those fields so we
can't support the entire `keyword` family, but this gets us *somewhere*.

Closes #64774
Nik Everett há 4 anos atrás
pai
commit
450fb552b4

+ 3 - 3
server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java

@@ -171,7 +171,7 @@ public abstract class BucketedSort implements Releasable {
      */
     @FunctionalInterface
     public interface ResultBuilder<T> {
-        T build(long index, SortValue sortValue);
+        T build(long index, SortValue sortValue) throws IOException;
     }
 
     /**
@@ -180,7 +180,7 @@ public abstract class BucketedSort implements Releasable {
      * @param builder builds results. See {@link ExtraData} for how to store
      *                data along side the sort for this to extract.
      */
-    public final <T extends Comparable<T>> List<T> getValues(long bucket, ResultBuilder<T> builder) {
+    public final <T extends Comparable<T>> List<T> getValues(long bucket, ResultBuilder<T> builder) throws IOException {
         long rootIndex = bucket * bucketSize;
         if (rootIndex >= values().size()) {
             // We've never seen this bucket.
@@ -201,7 +201,7 @@ public abstract class BucketedSort implements Releasable {
      * Get the values for a bucket if it has been collected. If it hasn't
      * then returns an empty array.
      */
-    public final List<SortValue> getValues(long bucket) {
+    public final List<SortValue> getValues(long bucket) throws IOException {
         return getValues(bucket, (i, sv) -> sv);
     }
 

+ 89 - 2
server/src/main/java/org/elasticsearch/search/sort/SortValue.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.search.sort;
 
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -48,13 +50,22 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
         return new LongSortValue(l);
     }
 
+    /**
+     * Get a {@linkplain SortValue} for bytes. Callers should be sure that they
+     * have a {@link BytesRef#deepCopyOf} of any mutable references.
+     */
+    public static SortValue from(BytesRef bytes) {
+        return new BytesSortValue(bytes);
+    }
+
     /**
      * Get the list of {@linkplain NamedWriteable}s that this class needs.
      */
     public static List<NamedWriteableRegistry.Entry> namedWriteables() {
         return Arrays.asList(
                 new NamedWriteableRegistry.Entry(SortValue.class, DoubleSortValue.NAME, DoubleSortValue::new),
-                new NamedWriteableRegistry.Entry(SortValue.class, LongSortValue.NAME, LongSortValue::new));
+                new NamedWriteableRegistry.Entry(SortValue.class, LongSortValue.NAME, LongSortValue::new),
+                new NamedWriteableRegistry.Entry(SortValue.class, BytesSortValue.NAME, BytesSortValue::new));
     }
 
     private SortValue() {
@@ -200,7 +211,7 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
             this.key = key;
         }
 
-        LongSortValue(StreamInput in) throws IOException {
+        private LongSortValue(StreamInput in) throws IOException {
             key = in.readLong();
         }
 
@@ -259,4 +270,80 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
             return key;
         }
     }
+
+    private static class BytesSortValue extends SortValue {
+        public static final String NAME = "bytes";
+
+        private final BytesRef key;
+
+        BytesSortValue(BytesRef key) {
+            this.key = key;
+        }
+
+        private BytesSortValue(StreamInput in) throws IOException {
+            key = in.readBytesRef();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            if (out.getVersion().before(Version.V_7_11_0)) {
+                throw new IllegalArgumentException(
+                    "versions of Elasticsearch before 7.11.0 can't handle non-numeric sort values and attempted to send to ["
+                        + out.getVersion()
+                        + "]"
+                );
+            }
+            out.writeBytesRef(key);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return NAME;
+        }
+
+        @Override
+        public Object getKey() {
+            return key;
+        }
+
+        @Override
+        public String format(DocValueFormat format) {
+            return format.format(key).toString();
+        }
+
+        @Override
+        protected XContentBuilder rawToXContent(XContentBuilder builder) throws IOException {
+            return builder.value(key.utf8ToString());
+        }
+
+        @Override
+        protected int compareToSameType(SortValue obj) {
+            BytesSortValue other = (BytesSortValue) obj;
+            return key.compareTo(other.key);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null || false == getClass().equals(obj.getClass())) {
+                return false;
+            }
+            BytesSortValue other = (BytesSortValue) obj;
+            return key.equals(other.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return key.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return key.toString();
+        }
+
+        @Override
+        public Number numberValue() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/search/sort/BucketedSortTestCase.java

@@ -73,7 +73,7 @@ public abstract class BucketedSortTestCase<T extends BucketedSort> extends ESTes
         return build(order, format, bucketSize, BucketedSort.NOOP_EXTRA_DATA, values);
     }
 
-    public final void testNeverCalled() {
+    public final void testNeverCalled() throws IOException {
         SortOrder order = randomFrom(SortOrder.values());
         DocValueFormat format = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
         try (T sort = build(order, format, 1, BucketedSort.NOOP_EXTRA_DATA, new double[] {})) {

+ 42 - 1
server/src/test/java/org/elasticsearch/search/sort/SortValueTests.java

@@ -19,14 +19,19 @@
 
 package org.elasticsearch.search.sort;
 
+import org.apache.lucene.document.InetAddressPoint;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.test.AbstractNamedWriteableTestCase;
+import org.elasticsearch.test.VersionUtils;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -51,7 +56,12 @@ public class SortValueTests extends AbstractNamedWriteableTestCase<SortValue> {
 
     @Override
     protected SortValue createTestInstance() {
-        return randomBoolean() ? SortValue.from(randomDouble()) : SortValue.from(randomLong());
+        switch (between(0, 2)) {
+            case 0: return SortValue.from(randomDouble());
+            case 1: return SortValue.from(randomLong());
+            case 2: return SortValue.from(new BytesRef(randomAlphaOfLength(5)));
+            default: throw new AssertionError();
+        }
     }
 
     @Override
@@ -81,11 +91,23 @@ public class SortValueTests extends AbstractNamedWriteableTestCase<SortValue> {
         assertThat(toXContent(SortValue.from(1), STRICT_DATE_TIME), equalTo("{\"test\":\"1970-01-01T00:00:00.001Z\"}"));
     }
 
+    public void testToXContentBytes() {
+        assertThat(toXContent(SortValue.from(new BytesRef("cat")), DocValueFormat.RAW), equalTo("{\"test\":\"cat\"}"));
+        assertThat(
+            toXContent(SortValue.from(new BytesRef(InetAddressPoint.encode(InetAddresses.forString("127.0.0.1")))), DocValueFormat.IP),
+            equalTo("{\"test\":\"127.0.0.1\"}")
+        );
+    }
+
     public void testCompareDifferentTypes() {
         assertThat(SortValue.from(1.0), lessThan(SortValue.from(1)));
         assertThat(SortValue.from(Double.MAX_VALUE), lessThan(SortValue.from(Long.MIN_VALUE)));
         assertThat(SortValue.from(1), greaterThan(SortValue.from(1.0)));
         assertThat(SortValue.from(Long.MIN_VALUE), greaterThan(SortValue.from(Double.MAX_VALUE)));
+        assertThat(SortValue.from(new BytesRef("cat")), lessThan(SortValue.from(1)));
+        assertThat(SortValue.from(1), greaterThan(SortValue.from(new BytesRef("cat"))));
+        assertThat(SortValue.from(new BytesRef("cat")), lessThan(SortValue.from(1.0)));
+        assertThat(SortValue.from(1.0), greaterThan(SortValue.from(new BytesRef("cat"))));
     }
 
     public void testCompareDoubles() {
@@ -102,6 +124,25 @@ public class SortValueTests extends AbstractNamedWriteableTestCase<SortValue> {
         assertThat(SortValue.from(r), greaterThan(SortValue.from(r - 1)));
     }
 
+    public void testBytes() {
+        String r = randomAlphaOfLength(5);
+        assertThat(SortValue.from(new BytesRef(r)), equalTo(SortValue.from(new BytesRef(r))));
+        assertThat(SortValue.from(new BytesRef(r)), lessThan(SortValue.from(new BytesRef(r + "with_suffix"))));
+        assertThat(SortValue.from(new BytesRef(r)), greaterThan(SortValue.from(new BytesRef(new byte[] {}))));
+    }
+
+    public void testSerializeBytesToOldVersion() {
+        SortValue value = SortValue.from(new BytesRef("can't send me!"));
+        Version version = VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_10_1);
+        Exception e = expectThrows(IllegalArgumentException.class, () -> copyInstance(value, version));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "versions of Elasticsearch before 7.11.0 can't handle non-numeric sort values and attempted to send to [" + version + "]"
+            )
+        );
+    }
+
     public String toXContent(SortValue sortValue, DocValueFormat format) {
         return Strings.toString(new ToXContentFragment() {
             @Override

+ 2 - 1
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java

@@ -114,7 +114,8 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
                 TopMetricsAggregationBuilder.NAME,
                 TopMetricsAggregationBuilder::new,
                 usage.track(AnalyticsStatsAction.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER)))
-                .addResultReader(InternalTopMetrics::new),
+                .addResultReader(InternalTopMetrics::new)
+                .setAggregatorRegistrar(TopMetricsAggregationBuilder::registerAggregators),
             new AggregationSpec(
                 TTestAggregationBuilder.NAME,
                 TTestAggregationBuilder::new,

+ 25 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java

@@ -18,7 +18,10 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
 import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry.RegistryKey;
 import org.elasticsearch.search.sort.SortBuilder;
 
 import java.io.IOException;
@@ -34,6 +37,28 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
     public static final String NAME = "top_metrics";
     public static final ParseField METRIC_FIELD = new ParseField("metrics");
 
+    static final RegistryKey<TopMetricsAggregator.MetricValuesSupplier> REGISTRY_KEY = new RegistryKey<>(
+        TopMetricsAggregationBuilder.NAME,
+        TopMetricsAggregator.MetricValuesSupplier.class
+    );
+
+    public static void registerAggregators(ValuesSourceRegistry.Builder registry) {
+        registry.registerUsage(NAME);
+        registry.register(REGISTRY_KEY, List.of(CoreValuesSourceType.NUMERIC), TopMetricsAggregator::buildNumericMetricValues, false);
+        registry.register(
+            REGISTRY_KEY,
+            List.of(CoreValuesSourceType.BOOLEAN, CoreValuesSourceType.DATE),
+            TopMetricsAggregator.LongMetricValues::new,
+            false
+        );
+        registry.register(
+            REGISTRY_KEY,
+            List.of(CoreValuesSourceType.BYTES, CoreValuesSourceType.IP),
+            TopMetricsAggregator.GlobalOrdsValues::new,
+            false
+        );
+    }
+
     /**
      * Default to returning only a single top metric.
      */

+ 136 - 68
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java

@@ -8,8 +8,10 @@ package org.elasticsearch.xpack.analytics.topmetrics;
 
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.BigArrays;
@@ -24,6 +26,8 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortBuilder;
@@ -37,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 
 import static java.util.stream.Collectors.toList;
+import static org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.REGISTRY_KEY;
 
 /**
  * Collects the {@code top_metrics} aggregation, which functions like a memory
@@ -57,17 +62,24 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
     private final BucketedSort sort;
     private final Metrics metrics;
 
-    TopMetricsAggregator(String name, SearchContext context, Aggregator parent, Map<String, Object> metadata, int size,
-            SortBuilder<?> sort, List<MetricSource> metricSources) throws IOException {
+    TopMetricsAggregator(
+        String name,
+        SearchContext context,
+        Aggregator parent,
+        Map<String, Object> metadata,
+        int size,
+        SortBuilder<?> sort,
+        MetricValues[] metricValues
+    ) throws IOException {
         super(name, context, parent, metadata);
         this.size = size;
-        metrics = new Metrics(size, context.getQueryShardContext().bigArrays(), metricSources);
+        this.metrics = new TopMetricsAggregator.Metrics(metricValues);
         /*
          * If we're only collecting a single value then only provided *that*
          * value to the sort so that swaps and loads are just a little faster
          * in that *very* common case.
          */
-        BucketedSort.ExtraData values = metricSources.size() == 1 ? metrics.values[0] : metrics;
+        BucketedSort.ExtraData values = metrics.values.length == 1 ? metrics.values[0] : metrics;
         this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
     }
 
@@ -77,7 +89,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
             throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]");
         }
         for (MetricValues values : metrics.values) {
-            if (values.name().equals(name)) {
+            if (values.name.equals(name)) {
                 return true;
             }
         }
@@ -138,49 +150,11 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         Releasables.close(sort, metrics);
     }
 
-    /**
-     * Information about each metric that this {@link Aggregator} uses to
-     * load and format metric values.
-     */
-    static class MetricSource {
-        private final String name;
-        private final DocValueFormat format;
-        private final ValuesSource.Numeric valuesSource;
-
-        MetricSource(String name, DocValueFormat format, ValuesSource.Numeric valuesSource) {
-            this.name = name;
-            this.format = format;
-            this.valuesSource = valuesSource;
-        }
-
-        String getName() {
-            return name;
-        }
-
-        DocValueFormat getFormat() {
-            return format;
-        }
-    }
-
     static class Metrics implements BucketedSort.ExtraData, Releasable {
         private final MetricValues[] values;
 
-        Metrics(int size, BigArrays bigArrays, List<MetricSource> sources) {
-            values = new MetricValues[sources.size()];
-            int i = 0;
-            for (MetricSource source : sources) {
-                values[i++] = valuesFor(size, bigArrays, source);
-            }
-        }
-
-        private static MetricValues valuesFor(int size, BigArrays bigArrays, MetricSource source) {
-            if (source.valuesSource == null) {
-                return new AlwaysNullMetricValues(source);
-            }
-            if (source.valuesSource.isFloatingPoint()) {
-                return new DoubleMetricValues(size, bigArrays, source);
-            }
-            return new LongMetricValues(size, bigArrays, source);
+        Metrics(MetricValues[] values) {
+            this.values = values;
         }
 
         boolean needsScores() {
@@ -194,7 +168,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
 
         double metric(String name, long index) {
             for (MetricValues value : values) {
-                if (value.name().equals(name)) {
+                if (value.name.equals(name)) {
                     return value.doubleValue(index);
                 }
             }
@@ -212,7 +186,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         }
 
         List<String> names() {
-            return Arrays.stream(values).map(MetricValues::name).collect(toList());
+            return Arrays.stream(values).map(v -> v.name).collect(toList());
         }
 
         @Override
@@ -241,44 +215,72 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         }
     }
 
-    private abstract static class MetricValues implements BucketedSort.ExtraData, Releasable {
-        protected final MetricSource source;
+    @FunctionalInterface
+    interface MetricValuesSupplier {
+        MetricValues build(int size, BigArrays bigArrays, String name, ValuesSourceConfig config);
+    }
 
-        MetricValues(MetricSource source) {
-            this.source = source;
-        }
+    abstract static class MetricValues implements BucketedSort.ExtraData, Releasable {
+        protected final String name;
 
-        final String name() {
-            return source.name;
+        MetricValues(String name) {
+            this.name = name;
         }
 
         abstract boolean needsScores();
         abstract double doubleValue(long index);
-        abstract InternalTopMetrics.MetricValue metricValue(long index);
+        abstract InternalTopMetrics.MetricValue metricValue(long index) throws IOException;
     }
 
     private abstract static class CollectingMetricValues extends MetricValues {
         protected final BigArrays bigArrays;
+        protected final ValuesSourceConfig config;
 
-        CollectingMetricValues(BigArrays bigArrays, MetricSource source) {
-            super(source);
+        CollectingMetricValues(BigArrays bigArrays, String name, ValuesSourceConfig config) {
+            super(name);
             this.bigArrays = bigArrays;
+            this.config = config;
         }
 
         @Override
         public final boolean needsScores() {
-            return source.valuesSource.needsScores();
+            return config.getValuesSource().needsScores();
         }
     }
+    
+    static MetricValues buildMetricValues(
+        ValuesSourceRegistry registry,
+        BigArrays bigArrays,
+        int size,
+        String name,
+        ValuesSourceConfig config
+    ) {
+        if (false == config.hasValues()) {
+            // `config` doesn't have the name if the
+            return new AlwaysNullMetricValues(name);
+        }
+        MetricValuesSupplier supplier = registry.getAggregator(REGISTRY_KEY, config);
+        return supplier.build(size, bigArrays, name, config);
+    }
+
+    static MetricValues buildNumericMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) {
+        ValuesSource.Numeric numeric = (ValuesSource.Numeric) config.getValuesSource();
+        if (numeric.isFloatingPoint()) {
+            return new DoubleMetricValues(size, bigArrays, name, config);
+        }
+        return new LongMetricValues(size, bigArrays, name, config);
+    }
 
     /**
      * Loads metrics for floating point numbers.
      */
     static class DoubleMetricValues extends CollectingMetricValues {
+        private final ValuesSource.Numeric valuesSource;
         private DoubleArray values;
 
-        DoubleMetricValues(int size, BigArrays bigArrays, MetricSource source) {
-            super(bigArrays, source);
+        DoubleMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) {
+            super(bigArrays, name, config);
+            valuesSource = (ValuesSource.Numeric) config.getValuesSource();
             values = bigArrays.newDoubleArray(size, false);
         }
 
@@ -294,7 +296,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
                 // Use NaN as a sentinel for "missing"
                 return null;
             }
-            return new MetricValue(source.format, SortValue.from(value));
+            return new MetricValue(config.format(), SortValue.from(value));
         }
 
         @Override
@@ -307,7 +309,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         @Override
         public Loader loader(LeafReaderContext ctx) throws IOException {
             // TODO allow configuration of value mode
-            NumericDoubleValues metricValues = MultiValueMode.AVG.select(source.valuesSource.doubleValues(ctx));
+            NumericDoubleValues metricValues = MultiValueMode.AVG.select(valuesSource.doubleValues(ctx));
             return (index, doc) -> {
                 if (index >= values.size()) {
                     values = bigArrays.grow(values, index + 1);
@@ -328,6 +330,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
      * Loads metrics for whole numbers.
      */
     static class LongMetricValues extends CollectingMetricValues {
+        private final ValuesSource.Numeric valuesSource;
         /**
          * Tracks "missing" values in a {@link BitArray}. Unlike
          * {@link DoubleMetricValues}, we there isn't a sentinel value
@@ -338,8 +341,9 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         private final MissingHelper empty;
         private LongArray values;
 
-        LongMetricValues(int size, BigArrays bigArrays, MetricSource source) {
-            super(bigArrays, source);
+        LongMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) {
+            super(bigArrays, name, config);
+            valuesSource = (ValuesSource.Numeric) config.getValuesSource();
             empty = new MissingHelper(bigArrays);
             values = bigArrays.newLongArray(size, false);
         }
@@ -357,7 +361,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
             if (empty.isEmpty(index)) {
                 return null;
             }
-            return new MetricValue(source.format, SortValue.from(values.get(index)));
+            return new MetricValue(config.format(), SortValue.from(values.get(index)));
         }
 
         @Override
@@ -371,7 +375,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         @Override
         public Loader loader(LeafReaderContext ctx) throws IOException {
             // TODO allow configuration of value mode
-            NumericDocValues metricValues = MultiValueMode.AVG.select(source.valuesSource.longValues(ctx));
+            NumericDocValues metricValues = MultiValueMode.AVG.select(valuesSource.longValues(ctx));
             return (index, doc) -> {
                 if (false == metricValues.advanceExact(doc)) {
                     empty.markMissing(index);
@@ -391,13 +395,77 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         }
     }
 
+    /**
+     * Loads metrics for whole numbers.
+     */
+    static class GlobalOrdsValues extends CollectingMetricValues {
+        private final ValuesSource.Bytes.WithOrdinals valuesSource;
+        private SortedSetDocValues globalOrds;
+        private LongArray values;
+
+        GlobalOrdsValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) {
+            super(bigArrays, name, config);
+            if (false == config.hasGlobalOrdinals()) {
+                throw new IllegalArgumentException("top_metrics can only collect bytes that have global ordinals");
+            }
+            this.valuesSource = (ValuesSource.Bytes.WithOrdinals) config.getValuesSource();
+            values = bigArrays.newLongArray(size, false);
+        }
+
+        @Override
+        public double doubleValue(long index) {
+            throw new IllegalArgumentException("pipeline aggregations may not refer to non-numeric metrics collected by top_metrics");
+        }
+
+        @Override
+        public MetricValue metricValue(long index) throws IOException {
+            if (globalOrds == null) {
+                // We didn't collect a single segment.
+                return null;
+            }
+            long ord = values.get(index);
+            if (ord == -1) {
+                return null;
+            }
+            return new MetricValue(config.format(), SortValue.from(BytesRef.deepCopyOf(globalOrds.lookupOrd(ord))));
+        }
+
+        @Override
+        public void swap(long lhs, long rhs) {
+            long tmp = values.get(lhs);
+            values.set(lhs, values.get(rhs));
+            values.set(rhs, tmp);
+        }
+
+        @Override
+        public Loader loader(LeafReaderContext ctx) throws IOException {
+            globalOrds = valuesSource.globalOrdinalsValues(ctx);
+            // For now just return the value that sorts first.
+            return (index, doc) -> {
+                if (false == globalOrds.advanceExact(doc)) {
+                    values.set(index, -1);
+                    return;
+                }
+                if (index >= values.size()) {
+                    values = bigArrays.grow(values, index + 1);
+                }
+                values.set(index, globalOrds.nextOrd());
+            };
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(values);
+        }
+    }
+
     /**
      * {@linkplain MetricValues} implementation for unmapped fields
      * that always returns {@code null} or {@code NaN}.
      */
     static class AlwaysNullMetricValues extends MetricValues {
-        AlwaysNullMetricValues(MetricSource source) {
-            super(source);
+        AlwaysNullMetricValues(String name) {
+            super(name);
         }
 
         @Override

+ 20 - 12
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java

@@ -15,17 +15,17 @@ import org.elasticsearch.search.aggregations.CardinalityUpperBound;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
 import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
 import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
-import org.elasticsearch.search.aggregations.support.ValueType;
-import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValues;
+import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValuesSupplier;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import static java.util.stream.Collectors.toList;
+import static org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.REGISTRY_KEY;
 
 public class TopMetricsAggregatorFactory extends AggregatorFactory {
     /**
@@ -58,14 +58,22 @@ public class TopMetricsAggregatorFactory extends AggregatorFactory {
                     + "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey()
                     + "] index level setting.");
         }
-        List<TopMetricsAggregator.MetricSource> metricSources = metricFields.stream().map(config -> {
-                    ValuesSourceConfig resolved = ValuesSourceConfig.resolve(
-                            context, ValueType.NUMERIC,
-                            config.getFieldName(), config.getScript(), config.getMissing(), config.getTimeZone(), null,
-                        CoreValuesSourceType.NUMERIC);
-                    return new TopMetricsAggregator.MetricSource(config.getFieldName(), resolved.format(),
-                        (ValuesSource.Numeric) resolved.getValuesSource());
-                }).collect(toList());
-        return new TopMetricsAggregator(name, searchContext, parent, metadata, size, sortBuilders.get(0), metricSources);
+        MetricValues[] metricValues = new MetricValues[metricFields.size()];
+        for (int i = 0; i < metricFields.size(); i++) {
+            MultiValuesSourceFieldConfig config = metricFields.get(i);
+            ValuesSourceConfig vsConfig = ValuesSourceConfig.resolve(
+                context,
+                null,
+                config.getFieldName(),
+                config.getScript(),
+                config.getMissing(),
+                config.getTimeZone(),
+                null,
+                CoreValuesSourceType.NUMERIC
+            );
+            MetricValuesSupplier supplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, vsConfig);
+            metricValues[i] = supplier.build(size, context.bigArrays(), config.getFieldName(), vsConfig);
+        }
+        return new TopMetricsAggregator(name, searchContext, parent, metadata, size, sortBuilders.get(0), metricValues);
     }
 }

+ 174 - 72
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorMetricsTests.java

@@ -7,20 +7,29 @@
 package org.elasticsearch.xpack.analytics.topmetrics;
 
 import org.apache.lucene.index.SortedNumericDocValues;
-import org.elasticsearch.common.CheckedBiConsumer;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.FieldContext;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
 import org.elasticsearch.search.sort.SortValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;
 import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.TopMetric;
-import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricSource;
+import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValues;
 import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.Metrics;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -38,29 +47,50 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TopMetricsAggregatorMetricsTests extends ESTestCase {
-    public void testUnmapped() throws IOException {
-        withMetric(null, (m, source) -> {
+    private static final ValuesSourceRegistry REGISTRY;
+    static {
+        ValuesSourceRegistry.Builder registry = new ValuesSourceRegistry.Builder();
+        TopMetricsAggregationBuilder.registerAggregators(registry);
+        REGISTRY = registry.build();
+    }
+
+    public void testNoNumbers() throws IOException {
+        assertNoValues(toConfig(null, CoreValuesSourceType.NUMERIC, DocValueFormat.RAW, false));
+    }
+
+    public void testNoDates() throws IOException {
+        assertNoValues(toConfig(null, CoreValuesSourceType.DATE, DocValueFormat.RAW, false));
+    }
+
+    public void testNoStrings() throws IOException {
+        assertNoValues(toConfig(null, CoreValuesSourceType.BYTES, DocValueFormat.RAW, false));
+    }
+
+    private void assertNoValues(ValuesSourceConfig config) throws IOException {
+        withMetric(config, m -> {
             // Load from doc is a noop
             m.loader(null).loadFromDoc(0, randomInt());
-            assertNullMetric(m, source, randomInt());
+            assertNullMetric(m, config, randomInt(), true);
         });
     }
 
     public void testEmptyLong() throws IOException {
         SortedNumericDocValues values = mock(SortedNumericDocValues.class);
         when(values.advanceExact(0)).thenReturn(false);
-        withMetric(valuesSource(values), (m, source) -> {
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> {
             m.loader(null).loadFromDoc(0, 0);
-            assertNullMetric(m, source, 0);
+            assertNullMetric(m, config, 0, true);
         });
     }
 
     public void testEmptyDouble() throws IOException {
         SortedNumericDoubleValues values = mock(SortedNumericDoubleValues.class);
         when(values.advanceExact(0)).thenReturn(false);
-        withMetric(valuesSource(values), (m, source) -> {
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> {
             m.loader(null).loadFromDoc(0, 0);
-            assertNullMetric(m, source, 0);
+            assertNullMetric(m, config, 0, true);
         });
     }
 
@@ -70,9 +100,10 @@ public class TopMetricsAggregatorMetricsTests extends ESTestCase {
         when(values.advanceExact(0)).thenReturn(true);
         when(values.docValueCount()).thenReturn(1);
         when(values.nextValue()).thenReturn(value);
-        withMetric(valuesSource(values), (m, source) -> {
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> {
             m.loader(null).loadFromDoc(0, 0);
-            assertMetricValue(m, 0, source, SortValue.from(value));
+            assertMetricValue(m, 0, config, SortValue.from(value), true);
         });
     }
 
@@ -82,9 +113,24 @@ public class TopMetricsAggregatorMetricsTests extends ESTestCase {
         when(values.advanceExact(0)).thenReturn(true);
         when(values.docValueCount()).thenReturn(1);
         when(values.nextValue()).thenReturn(value);
-        withMetric(valuesSource(values), (m, source) -> {
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> {
             m.loader(null).loadFromDoc(0, 0);
-            assertMetricValue(m, 0, source, SortValue.from(value));
+            assertMetricValue(m, 0, config, SortValue.from(value), true);
+        });
+    }
+
+    public void testLoadString() throws IOException {
+        BytesRef value = new BytesRef(randomAlphaOfLength(5));
+        SortedSetDocValues values = mock(SortedSetDocValues.class);
+        when(values.advanceExact(0)).thenReturn(true);
+        when(values.getValueCount()).thenReturn(1L);
+        when(values.nextOrd()).thenReturn(0L);
+        when(values.lookupOrd(0L)).thenReturn(value);
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> {
+            m.loader(null).loadFromDoc(0, 0);
+            assertMetricValue(m, 0, config, SortValue.from(value), false);
         });
     }
 
@@ -96,9 +142,8 @@ public class TopMetricsAggregatorMetricsTests extends ESTestCase {
         when(values.advanceExact(1)).thenReturn(true);
         when(values.docValueCount()).thenReturn(1);
         when(values.nextValue()).thenReturn(firstValue, secondValue);
-        withMetric(valuesSource(values), (m, source) -> {
-            assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue));
-        });
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), true));
     }
 
     public void testLoadAndSwapDouble() throws IOException {
@@ -109,105 +154,162 @@ public class TopMetricsAggregatorMetricsTests extends ESTestCase {
         when(values.advanceExact(1)).thenReturn(true);
         when(values.docValueCount()).thenReturn(1);
         when(values.nextValue()).thenReturn(firstValue, secondValue);
-        withMetric(valuesSource(values), (m, source) -> {
-            assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue));
-        });
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), true));
+    }
+
+    public void testLoadAndSwapString() throws IOException {
+        BytesRef firstValue = new BytesRef(randomAlphaOfLength(5));
+        BytesRef secondValue = new BytesRef(randomAlphaOfLength(5));
+        SortedSetDocValues values = mock(SortedSetDocValues.class);
+        when(values.advanceExact(0)).thenReturn(true);
+        when(values.advanceExact(1)).thenReturn(true);
+        when(values.getValueCount()).thenReturn(1L);
+        when(values.nextOrd()).thenReturn(0L, 1L);
+        when(values.lookupOrd(0L)).thenReturn(firstValue);
+        when(values.lookupOrd(1L)).thenReturn(secondValue);
+        ValuesSourceConfig config = toConfig(values);
+        withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), false));
     }
 
     public void testManyValues() throws IOException {
         long[] values = IntStream.range(0, between(2, 100)).mapToLong(i -> randomLong()).toArray();
-        List<ValuesSource.Numeric> valuesSources = Arrays.stream(values)
-                .mapToObj(v -> {
-                    try {
-                        SortedNumericDocValues docValues = mock(SortedNumericDocValues.class);
-                        when(docValues.advanceExact(0)).thenReturn(true);
-                        when(docValues.docValueCount()).thenReturn(1);
-                        when(docValues.nextValue()).thenReturn(v);
-                        return valuesSource(docValues);
-                    } catch (IOException e) {
-                        throw new AssertionError(e);
-                    }
-                })
-                .collect(toList());
-        withMetrics(valuesSources, (m, sources) -> {
+        List<ValuesSourceConfig> configs = Arrays.stream(values).mapToObj(v -> {
+            try {
+                SortedNumericDocValues docValues = mock(SortedNumericDocValues.class);
+                when(docValues.advanceExact(0)).thenReturn(true);
+                when(docValues.docValueCount()).thenReturn(1);
+                when(docValues.nextValue()).thenReturn(v);
+                return toConfig(docValues);
+            } catch (IOException e) {
+                throw new AssertionError(e);
+            }
+        }).collect(toList());
+        withMetrics(configs, m -> {
             m.loader(null).loadFromDoc(0, 0);
             TopMetric metric = m.resultBuilder(DocValueFormat.RAW).build(0, SortValue.from(1));
             assertThat(metric.getMetricValues(), hasSize(values.length));
             for (int i = 0; i < values.length; i++) {
-                MetricSource source = sources.get(i);
-                assertThat(m.metric(source.getName(), 0), equalTo((double) values[i]));
-                assertThat(metric.getMetricValues(),
-                        hasItem(new MetricValue(source.getFormat(), SortValue.from(values[i]))));
+                ValuesSourceConfig config = configs.get(i);
+                assertThat(m.metric(config.fieldContext().field(), 0), equalTo((double) values[i]));
+                assertThat(metric.getMetricValues(), hasItem(new MetricValue(config.format(), SortValue.from(values[i]))));
             }
         });
     }
 
-    private ValuesSource.Numeric valuesSource(SortedNumericDocValues values) throws IOException {
+    private ValuesSourceConfig toConfig(SortedNumericDocValues values) throws IOException {
         ValuesSource.Numeric source = mock(ValuesSource.Numeric.class);
         when(source.isFloatingPoint()).thenReturn(false);
         when(source.longValues(null)).thenReturn(values);
-        return source;
+        if (randomBoolean()) {
+            return toConfig(source, CoreValuesSourceType.NUMERIC, randomWholeNumberDocValuesFormat(), true);
+        }
+        DocValueFormat dateFormatter = new DocValueFormat.DateTime(
+            DateFormatter.forPattern(randomDateFormatterPattern()),
+            ZoneOffset.UTC,
+            DateFieldMapper.Resolution.MILLISECONDS
+        );
+        return toConfig(source, CoreValuesSourceType.DATE, randomFrom(DocValueFormat.RAW, dateFormatter), true);
     }
 
-    private ValuesSource.Numeric valuesSource(SortedNumericDoubleValues values) throws IOException {
+    private ValuesSourceConfig toConfig(SortedNumericDoubleValues values) throws IOException {
         ValuesSource.Numeric source = mock(ValuesSource.Numeric.class);
         when(source.isFloatingPoint()).thenReturn(true);
         when(source.doubleValues(null)).thenReturn(values);
-        return source;
+        return toConfig(
+            source,
+            CoreValuesSourceType.NUMERIC,
+            randomFrom(DocValueFormat.RAW, new DocValueFormat.Decimal("####.####")),
+            true
+        );
     }
 
-    private void withMetric(ValuesSource.Numeric valuesSource,
-            CheckedBiConsumer<Metrics, MetricSource, IOException> consumer) throws IOException {
-        withMetrics(singletonList(valuesSource), (m, sources) -> consumer.accept(m, sources.get(0)));
+    private ValuesSourceConfig toConfig(SortedSetDocValues values) throws IOException {
+        ValuesSource.Bytes.WithOrdinals source = mock(ValuesSource.Bytes.WithOrdinals.class);
+        when(source.globalOrdinalsValues(null)).thenReturn(values);
+        ValuesSourceConfig config = toConfig(source, CoreValuesSourceType.BYTES, DocValueFormat.RAW, true);
+        when(config.hasGlobalOrdinals()).thenReturn(true);
+        return config;
     }
 
-    private void withMetrics(List<ValuesSource.Numeric> valuesSources,
-            CheckedBiConsumer<Metrics, List<MetricSource>, IOException> consumer) throws IOException {
-        Set<String> names = new HashSet<>();
-        List<MetricSource> sources = new ArrayList<>(valuesSources.size());
-        for (ValuesSource.Numeric valuesSource : valuesSources) {
-            String name = randomValueOtherThanMany(names::contains, () -> randomAlphaOfLength(5));
-            names.add(name);
-            sources.add(new MetricSource(name, randomDocValueFormat(), valuesSource));
+    private final Set<String> names = new HashSet<>();
+
+    private ValuesSourceConfig toConfig(ValuesSource source, ValuesSourceType type, DocValueFormat format, boolean hasValues) {
+        String name = randomValueOtherThanMany(names::contains, () -> randomAlphaOfLength(5));
+        names.add(name);
+        ValuesSourceConfig config = mock(ValuesSourceConfig.class);
+        when(config.fieldContext()).thenReturn(new FieldContext(name, null, null));
+        when(config.valueSourceType()).thenReturn(type);
+        when(config.format()).thenReturn(format);
+        when(config.getValuesSource()).thenReturn(source);
+        when(config.hasValues()).thenReturn(hasValues);
+        return config;
+    }
+
+    private void withMetric(ValuesSourceConfig config, CheckedConsumer<Metrics, IOException> consumer) throws IOException {
+        withMetrics(singletonList(config), consumer);
+    }
+
+    private void withMetrics(List<ValuesSourceConfig> configs, CheckedConsumer<Metrics, IOException> consumer) throws IOException {
+        MetricValues[] values = new MetricValues[configs.size()];
+        for (int i = 0; i < configs.size(); i++) {
+            values[i] = TopMetricsAggregator.buildMetricValues(
+                REGISTRY,
+                BigArrays.NON_RECYCLING_INSTANCE,
+                1,
+                configs.get(i).fieldContext().field(),
+                configs.get(i)
+            );
         }
-        try (Metrics m = new Metrics(1, BigArrays.NON_RECYCLING_INSTANCE, sources)) {
-            consumer.accept(m, sources);
+        try (Metrics m = new Metrics(values)) {
+            consumer.accept(m);
         }
     }
 
-    private void assertNullMetric(Metrics m, MetricSource source, long index) {
-        DocValueFormat sortFormat = randomDocValueFormat();
-        assertThat(m.metric(source.getName(), index), notANumber());
+    private void assertNullMetric(Metrics m, ValuesSourceConfig config, long index, boolean assertSortValue) throws IOException {
+        if (assertSortValue) {
+            assertThat(m.metric(config.fieldContext().field(), index), notANumber());
+        }
+        DocValueFormat sortFormat = randomWholeNumberDocValuesFormat();
         TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1));
         assertThat(metric.getSortFormat(), sameInstance(sortFormat));
         assertThat(metric.getMetricValues(), equalTo(singletonList(null)));
     }
 
-    private void assertMetricValue(Metrics m, long index, MetricSource source, SortValue value) {
-        DocValueFormat sortFormat = randomDocValueFormat();
-        assertThat(m.metric(source.getName(), index), equalTo(value.numberValue().doubleValue()));
+    private void assertMetricValue(Metrics m, long index, ValuesSourceConfig config, SortValue value, boolean assertSortValue)
+        throws IOException {
+        if (assertSortValue) {
+            assertThat(m.metric(config.fieldContext().field(), index), equalTo(value.numberValue().doubleValue()));
+        }
+        DocValueFormat sortFormat = randomWholeNumberDocValuesFormat();
         TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1));
         assertThat(metric.getSortValue(), equalTo(SortValue.from(1)));
         assertThat(metric.getSortFormat(), sameInstance(sortFormat));
-        assertThat(metric.getMetricValues(), equalTo(singletonList(new MetricValue(source.getFormat(), value))));
+        assertThat(metric.getMetricValues(), equalTo(singletonList(new MetricValue(config.format(), value))));
     }
 
-    private void assertLoadTwoAndSwap(Metrics m, MetricSource source, SortValue firstValue, SortValue secondValue) throws IOException {
+    private void assertLoadTwoAndSwap(
+        Metrics m,
+        ValuesSourceConfig config,
+        SortValue firstValue,
+        SortValue secondValue,
+        boolean assertSortValue
+    ) throws IOException {
         m.loader(null).loadFromDoc(0, 0);
         m.loader(null).loadFromDoc(1, 1);
-        assertMetricValue(m, 0, source, firstValue);
-        assertMetricValue(m, 1, source, secondValue);
+        assertMetricValue(m, 0, config, firstValue, assertSortValue);
+        assertMetricValue(m, 1, config, secondValue, assertSortValue);
         m.swap(0, 1);
-        assertMetricValue(m, 0, source, secondValue);
-        assertMetricValue(m, 1, source, firstValue);
+        assertMetricValue(m, 0, config, secondValue, assertSortValue);
+        assertMetricValue(m, 1, config, firstValue, assertSortValue);
         m.loader(null).loadFromDoc(2, 2); // 2 is empty
-        assertNullMetric(m, source, 2);
+        assertNullMetric(m, config, 2, assertSortValue);
         m.swap(0, 2);
-        assertNullMetric(m, source, 0);
-        assertMetricValue(m, 2, source, secondValue);
+        assertNullMetric(m, config, 0, assertSortValue);
+        assertMetricValue(m, 2, config, secondValue, assertSortValue);
     }
 
-    private DocValueFormat randomDocValueFormat() {
-        return randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
+    private DocValueFormat randomWholeNumberDocValuesFormat() {
+        return randomFrom(DocValueFormat.RAW, new DocValueFormat.Decimal("####"));
     }
 }

+ 33 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml

@@ -550,3 +550,36 @@
   - match: { aggregations.tm.top.1.sort: [2] }
   - match: { aggregations.tm.top.2.metrics.v: 3.1414999961853027 }
   - match: { aggregations.tm.top.2.sort: [1] }
+
+---
+"fetch keyword":
+  - do:
+      bulk:
+        index: test
+        refresh: true
+        body:
+          - '{"index": {}}'
+          - '{"s": 1, "animal": "cat"}'
+          - '{"index": {}}'
+          - '{"s": 2, "animal": "dog"}'
+          - '{"index": {}}'
+          - '{"s": 3, "animal": "chicken"}'
+
+  - do:
+      search:
+        size: 0
+        body:
+          aggs:
+            tm:
+              top_metrics:
+                metrics:
+                  field: animal.keyword
+                sort:
+                  s: asc
+                size: 3
+  - match: { aggregations.tm.top.0.metrics.animal\.keyword: cat}
+  - match: { aggregations.tm.top.0.sort: [1] }
+  - match: { aggregations.tm.top.1.metrics.animal\.keyword: dog}
+  - match: { aggregations.tm.top.1.sort: [2] }
+  - match: { aggregations.tm.top.2.metrics.animal\.keyword: chicken}
+  - match: { aggregations.tm.top.2.sort: [3] }