Browse Source

Support multiple metrics in `top_metrics` agg (#52965)

This adds support for returning multiple metrics to the `top_metrics`
agg. It looks like:
```
POST /test/_search?filter_path=aggregations
{
  "aggs": {
    "tm": {
      "top_metrics": {
        "metrics": [
          {"field": "v"},
          {"field": "m"}
        ],
        "sort": {"s": "desc"}
      }
    }
  }
}
```
Nik Everett 5 years ago
parent
commit
56058ab6af
15 changed files with 477 additions and 201 deletions
  1. 11 5
      client/rest-high-level/src/main/java/org/elasticsearch/client/analytics/TopMetricsAggregationBuilder.java
  2. 16 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/analytics/AnalyticsAggsIT.java
  3. 60 10
      docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc
  4. 4 2
      server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinAggregatorTests.java
  5. 3 2
      server/src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricAggregatorTests.java
  6. 4 3
      test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
  7. 59 48
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetrics.java
  8. 20 15
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java
  9. 127 34
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java
  10. 14 18
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java
  11. 7 6
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetricsReduceTests.java
  12. 67 21
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetricsTests.java
  13. 10 4
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilderTests.java
  14. 58 14
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java
  15. 17 17
      x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml

+ 11 - 5
client/rest-high-level/src/main/java/org/elasticsearch/client/analytics/TopMetricsAggregationBuilder.java

@@ -32,6 +32,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortBuilder;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -49,20 +51,20 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
 
     private final SortBuilder<?> sort;
     private final int size;
-    private final String metric;
+    private final List<String> metrics;
 
     /**
      * Build the request.
      * @param name the name of the metric
      * @param sort the sort key used to select the top metrics
      * @param size number of results to return per bucket
-     * @param metric the name of the field to select
+     * @param metrics the names of the fields to select
      */
-    public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, int size, String metric) {
+    public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, int size, String... metrics) {
         super(name);
         this.sort = sort;
         this.size = size;
-        this.metric = metric;
+        this.metrics = Arrays.asList(metrics);
     }
 
     @Override
@@ -78,7 +80,11 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
             sort.toXContent(builder, params);
             builder.endArray();
             builder.field("size", size);
-            builder.startObject("metric").field("field", metric).endObject();
+            builder.startArray("metrics");
+            for (String metric: metrics) {
+                builder.startObject().field("field", metric).endObject();
+            }
+            builder.endArray();
         }
         return builder.endObject();
     }

+ 16 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/analytics/AnalyticsAggsIT.java

@@ -74,6 +74,20 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
         assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3.0)));
     }
 
+    public void testTopMetricsManyMetrics() throws IOException {
+        indexTopMetricsData();
+        SearchRequest search = new SearchRequest("test");
+        search.source().aggregation(new TopMetricsAggregationBuilder(
+                "test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v", "m"));
+        SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
+        ParsedTopMetrics top = response.getAggregations().get("test");
+        assertThat(top.getTopMetrics(), hasSize(1));
+        ParsedTopMetrics.TopMetrics metric = top.getTopMetrics().get(0);
+        assertThat(metric.getSort(), equalTo(singletonList(2)));
+        assertThat(metric.getMetrics(), hasEntry("v", 3.0));
+        assertThat(metric.getMetrics(), hasEntry("m", 13.0));
+    }
+
     public void testTopMetricsSizeTwo() throws IOException {
         indexTopMetricsData();
         SearchRequest search = new SearchRequest("test");
@@ -92,8 +106,8 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
 
     private void indexTopMetricsData() throws IOException {
         BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
-        bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2));
-        bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3));
+        bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2.0, "m", 12.0));
+        bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3.0, "m", 13.0));
         highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
     }
 }

+ 60 - 10
docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc

@@ -22,7 +22,7 @@ POST /test/_search?filter_path=aggregations
   "aggs": {
     "tm": {
       "top_metrics": {
-        "metric": {"field": "v"},
+        "metrics": {"field": "v"},
         "sort": {"s": "desc"}
       }
     }
@@ -68,11 +68,61 @@ request. So,
 NOTE: This aggregation doesn't support any sort of "tie breaking". If two documents have
 the same sort values then this aggregation could return either document's fields.
 
-==== `metric`
+==== `metrics`
+
+`metrics` selects the fields to of the "top" document to return. Like most other
+aggregations, `top_metrics` casts these values cast to `double` precision
+floating point numbers. So they have to be numeric. Dates *work*, but they
+come back as a `double` precision floating point containing milliseconds since
+epoch. `keyword` fields aren't allowed.
+
+You can return multiple metrics by providing a list:
+
+[source,console,id=search-aggregations-metrics-top-metrics-list-of-metrics]
+----
+POST /test/_bulk?refresh
+{"index": {}}
+{"s": 1, "v": 3.1415, "m": 1.9}
+{"index": {}}
+{"s": 2, "v": 1.0, "m": 6.7}
+{"index": {}}
+{"s": 3, "v": 2.71828, "m": -12.2}
+POST /test/_search?filter_path=aggregations
+{
+  "aggs": {
+    "tm": {
+      "top_metrics": {
+        "metrics": [
+          {"field": "v"},
+          {"field": "m"}
+        ],
+        "sort": {"s": "desc"}
+      }
+    }
+  }
+}
+----
+
+Which returns:
+
+[source,js]
+----
+{
+  "aggregations": {
+    "tm": {
+      "top": [ {
+        "sort": [3],
+        "metrics": {
+          "v": 2.718280076980591,
+          "m": -12.199999809265137
+        }
+      } ]
+    }
+  }
+}
+----
+// TESTRESPONSE
 
-At this point `metric` supports only `{"field": "field_name"}` and all metrics
-are returned as double precision floating point numbers. Expect more to
-come here.
 
 ==== `size`
 
@@ -92,7 +142,7 @@ POST /test/_search?filter_path=aggregations
   "aggs": {
     "tm": {
       "top_metrics": {
-        "metric": {"field": "v"},
+        "metrics": {"field": "v"},
         "sort": {"s": "desc"},
         "size": 2
       }
@@ -174,7 +224,7 @@ POST /node/_search?filter_path=aggregations
       "aggs": {
         "tm": {
           "top_metrics": {
-            "metric": {"field": "v"},
+            "metrics": {"field": "v"},
             "sort": {"date": "desc"}
           }
         }
@@ -230,7 +280,7 @@ POST /node/_search?filter_path=aggregations
       "aggs": {
         "tm": {
           "top_metrics": {
-            "metric": {"field": "v"},
+            "metrics": {"field": "v"},
             "sort": {"date": "desc"}
           }
         }
@@ -292,7 +342,7 @@ POST /test*/_search?filter_path=aggregations
   "aggs": {
     "tm": {
       "top_metrics": {
-        "metric": {"field": "v"},
+        "metrics": {"field": "v"},
         "sort": {"s": "asc"}
       }
     }
@@ -325,7 +375,7 @@ POST /test*/_search?filter_path=aggregations
   "aggs": {
     "tm": {
       "top_metrics": {
-        "metric": {"field": "v"},
+        "metrics": {"field": "v"},
         "sort": {"s": {"order": "asc", "numeric_type": "double"}}
       }
     }

+ 4 - 2
server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinAggregatorTests.java

@@ -51,6 +51,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.ContentPath;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -180,8 +181,9 @@ public class MinAggregatorTests extends AggregatorTestCase {
 
     @Override
     protected QueryShardContext queryShardContextMock(IndexSearcher searcher, MapperService mapperService,
-                                                      IndexSettings indexSettings, CircuitBreakerService circuitBreakerService) {
-         this.queryShardContext = super.queryShardContextMock(searcher, mapperService, indexSettings, circuitBreakerService);
+                                                      IndexSettings indexSettings, CircuitBreakerService circuitBreakerService,
+                                                      BigArrays bigArrays) {
+         this.queryShardContext = super.queryShardContextMock(searcher, mapperService, indexSettings, circuitBreakerService, bigArrays);
          return queryShardContext;
     }
 

+ 3 - 2
server/src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricAggregatorTests.java

@@ -421,11 +421,12 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
     protected QueryShardContext queryShardContextMock(IndexSearcher searcher,
                                                         MapperService mapperService,
                                                         IndexSettings indexSettings,
-                                                        CircuitBreakerService circuitBreakerService) {
+                                                        CircuitBreakerService circuitBreakerService,
+                                                        BigArrays bigArrays) {
         MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS, Collections.emptyMap());
         Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
         ScriptService scriptService =  new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
-        return new QueryShardContext(0, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, null, null, mapperService, null, scriptService,
+        return new QueryShardContext(0, indexSettings, bigArrays, null, null, mapperService, null, scriptService,
                 xContentRegistry(), writableRegistry(), null, null, System::currentTimeMillis, null, null, () -> true);
     }
 }

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -285,7 +285,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
         when(searchContext.lookup()).thenReturn(searchLookup);
 
         QueryShardContext queryShardContext =
-            queryShardContextMock(contextIndexSearcher, mapperService, indexSettings, circuitBreakerService);
+            queryShardContextMock(contextIndexSearcher, mapperService, indexSettings, circuitBreakerService, bigArrays);
         when(searchContext.getQueryShardContext()).thenReturn(queryShardContext);
         when(queryShardContext.getObjectMapper(anyString())).thenAnswer(invocation -> {
             String fieldName = (String) invocation.getArguments()[0];
@@ -336,9 +336,10 @@ public abstract class AggregatorTestCase extends ESTestCase {
     protected QueryShardContext queryShardContextMock(IndexSearcher searcher,
                                                       MapperService mapperService,
                                                       IndexSettings indexSettings,
-                                                      CircuitBreakerService circuitBreakerService) {
+                                                      CircuitBreakerService circuitBreakerService,
+                                                      BigArrays bigArrays) {
 
-        return new QueryShardContext(0, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, null,
+        return new QueryShardContext(0, indexSettings, bigArrays, null,
             getIndexFieldDataLookup(mapperService, circuitBreakerService),
             mapperService, null, getMockScriptService(), xContentRegistry(),
             writableRegistry(), null, searcher, System::currentTimeMillis, null, null, () -> true);

+ 59 - 48
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetrics.java

@@ -20,23 +20,27 @@ import org.elasticsearch.search.sort.SortValue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import static java.util.Collections.emptyList;
+import static org.elasticsearch.search.builder.SearchSourceBuilder.SORT_FIELD;
+import static  org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.METRIC_FIELD;
+
 
 public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiValue {
     private final SortOrder sortOrder;
     private final int size;
-    private final String metricName;
+    private final List<String> metricNames;
     private final List<TopMetric> topMetrics;
 
-    public InternalTopMetrics(String name, @Nullable SortOrder sortOrder, String metricName,
+    public InternalTopMetrics(String name, @Nullable SortOrder sortOrder, List<String> metricNames,
             int size, List<TopMetric> topMetrics, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
         super(name, pipelineAggregators, metaData);
         this.sortOrder = sortOrder;
-        this.metricName = metricName;
+        this.metricNames = metricNames;
         /*
          * topMetrics.size won't be size when the bucket doesn't have size docs!
          */
@@ -44,9 +48,9 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
         this.topMetrics = topMetrics;
     }
 
-    static InternalTopMetrics buildEmptyAggregation(String name, String metricField,
+    static InternalTopMetrics buildEmptyAggregation(String name, List<String> metricNames,
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
-        return new InternalTopMetrics(name, SortOrder.ASC, metricField, 0, emptyList(), pipelineAggregators, metaData);
+        return new InternalTopMetrics(name, SortOrder.ASC, metricNames, 0, emptyList(), pipelineAggregators, metaData);
     }
 
     /**
@@ -55,7 +59,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
     public InternalTopMetrics(StreamInput in) throws IOException {
         super(in);
         sortOrder = SortOrder.readFromStream(in);
-        metricName = in.readString();
+        metricNames = in.readStringList();
         size = in.readVInt();
         topMetrics = in.readList(TopMetric::new);
     }
@@ -63,7 +67,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
     @Override
     protected void doWriteTo(StreamOutput out) throws IOException {
         sortOrder.writeTo(out);
-        out.writeString(metricName);
+        out.writeStringCollection(metricNames);
         out.writeVInt(size);
         out.writeList(topMetrics);
     }
@@ -78,15 +82,19 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
         if (path.isEmpty()) {
             return this;
         }
-        if (path.size() == 1 && metricName.contentEquals(path.get(1))) {
-            if (topMetrics.isEmpty()) {
-                // Unmapped.
-                return null;
-            }
-            assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
-            return topMetrics.get(0).metricValue;
+        if (path.size() != 1) {
+            throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+        }
+        int index = metricNames.indexOf(path.get(0));
+        if (index < 0) {
+            throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
         }
-        throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+        if (topMetrics.isEmpty()) {
+            // Unmapped.
+            return null;
+        }
+        assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
+        return topMetrics.get(0).metricValues[index];
     }
 
     @Override
@@ -116,7 +124,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
                 queue.updateTop();
             }
         }
-        return new InternalTopMetrics(getName(), sortOrder, metricName, size, merged, pipelineAggregators(), getMetaData());
+        return new InternalTopMetrics(getName(), sortOrder, metricNames, size, merged, pipelineAggregators(), getMetaData());
     }
 
     @Override
@@ -128,7 +136,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
         builder.startArray("top");
         for (TopMetric top : topMetrics) {
-            top.toXContent(builder, metricName);
+            top.toXContent(builder, metricNames);
         }
         builder.endArray();
         return builder;
@@ -136,7 +144,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), sortOrder, metricName, size, topMetrics);
+        return Objects.hash(super.hashCode(), sortOrder, metricNames, size, topMetrics);
     }
 
     @Override
@@ -144,21 +152,22 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
         if (super.equals(obj) == false) return false;
         InternalTopMetrics other = (InternalTopMetrics) obj;
         return sortOrder.equals(other.sortOrder) &&
-            metricName.equals(other.metricName) &&
+            metricNames.equals(other.metricNames) &&
             size == other.size &&
             topMetrics.equals(other.topMetrics);
     }
 
     @Override
     public double value(String name) {
-        if (metricName.equals(name)) {
-            if (topMetrics.isEmpty()) {
-                return Double.NaN;
-            }
-            assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
-            return topMetrics.get(0).metricValue;
+        int index = metricNames.indexOf(name);
+        if (index < 0) {
+            throw new IllegalArgumentException("unknown metric [" + name + "]");            
         }
-        throw new IllegalArgumentException("known metric [" + name + "]");
+        if (topMetrics.isEmpty()) {
+            return Double.NaN;
+        }
+        assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
+        return topMetrics.get(0).metricValues[index];
     }
 
     SortOrder getSortOrder() {
@@ -169,8 +178,8 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
         return size;
     }
 
-    String getMetricName() {
-        return metricName;
+    List<String> getMetricNames() {
+        return metricNames;
     }
 
     List<TopMetric> getTopMetrics() {
@@ -197,18 +206,25 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
     static class TopMetric implements Writeable, Comparable<TopMetric> {
         private final DocValueFormat sortFormat;
         private final SortValue sortValue;
-        private final double metricValue;
+        private final double[] metricValues;
 
-        TopMetric(DocValueFormat sortFormat, SortValue sortValue, double metricValue) {
+        TopMetric(DocValueFormat sortFormat, SortValue sortValue, double[] metricValues) {
             this.sortFormat = sortFormat;
             this.sortValue = sortValue;
-            this.metricValue = metricValue;
+            this.metricValues = metricValues;
         }
 
         TopMetric(StreamInput in) throws IOException {
             sortFormat = in.readNamedWriteable(DocValueFormat.class);
             sortValue = in.readNamedWriteable(SortValue.class);
-            metricValue = in.readDouble();
+            metricValues = in.readDoubleArray();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeNamedWriteable(sortFormat);
+            out.writeNamedWriteable(sortValue);
+            out.writeDoubleArray(metricValues);
         }
 
         DocValueFormat getSortFormat() {
@@ -219,26 +235,21 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
             return sortValue;
         }
 
-        double getMetricValue() {
-            return metricValue;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.writeNamedWriteable(sortFormat);
-            out.writeNamedWriteable(sortValue);
-            out.writeDouble(metricValue);
+        double[] getMetricValues() {
+            return metricValues;
         }
 
-        public XContentBuilder toXContent(XContentBuilder builder, String metricName) throws IOException {
+        public XContentBuilder toXContent(XContentBuilder builder, List<String> metricNames) throws IOException {
             builder.startObject();
             {
-                builder.startArray("sort");
+                builder.startArray(SORT_FIELD.getPreferredName());
                 sortValue.toXContent(builder, sortFormat);
                 builder.endArray();
-                builder.startObject("metrics");
+                builder.startObject(METRIC_FIELD.getPreferredName());
                 {
-                    builder.field(metricName, Double.isNaN(metricValue) ? null : metricValue);
+                    for (int i = 0; i < metricValues.length; i++) {
+                        builder.field(metricNames.get(i), Double.isNaN(metricValues[i]) ? null : metricValues[i]);
+                    }
                 }
                 builder.endObject();
             }
@@ -258,17 +269,17 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
             TopMetric other = (TopMetric) obj;
             return sortFormat.equals(other.sortFormat)
                     && sortValue.equals(other.sortValue)
-                    && metricValue == other.metricValue;
+                    && Arrays.equals(metricValues, other.metricValues);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(sortFormat, sortValue, metricValue);
+            return Objects.hash(sortFormat, sortValue, Arrays.hashCode(metricValues));
         }
 
         @Override
         public String toString() {
-            return "TopMetric[" + sortFormat + "," + sortValue + "," + metricValue + "]"; 
+            return "TopMetric[" + sortFormat + "," + sortValue + "," + Arrays.toString(metricValues) + "]"; 
         }
     }
 }

+ 20 - 15
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java

@@ -32,7 +32,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.SORT_FIELD;
 
 public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<TopMetricsAggregationBuilder> {
     public static final String NAME = "top_metrics";
-    public static final ParseField METRIC_FIELD = new ParseField("metric");
+    public static final ParseField METRIC_FIELD = new ParseField("metrics");
 
     /**
      * Default to returning only a single top metric.
@@ -47,34 +47,35 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
                 if (size < 1) {
                     throw new IllegalArgumentException("[size] must be more than 0 but was [" + size + "]");
                 }
-                MultiValuesSourceFieldConfig metricField = (MultiValuesSourceFieldConfig) args[2];
-                return new TopMetricsAggregationBuilder(name, sorts, size, metricField);
+                @SuppressWarnings("unchecked")
+                List<MultiValuesSourceFieldConfig> metricFields = (List<MultiValuesSourceFieldConfig>) args[2];
+                return new TopMetricsAggregationBuilder(name, sorts, size, metricFields);
             });
     static {
         PARSER.declareField(constructorArg(), (p, n) -> SortBuilder.fromXContent(p), SORT_FIELD,
                 ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING);
         PARSER.declareInt(optionalConstructorArg(), SIZE_FIELD);
         ContextParser<Void, MultiValuesSourceFieldConfig.Builder> metricParser = MultiValuesSourceFieldConfig.PARSER.apply(true, false);
-        PARSER.declareObject(constructorArg(), (p, n) -> metricParser.parse(p, null).build(), METRIC_FIELD);
+        PARSER.declareObjectArray(constructorArg(), (p, n) -> metricParser.parse(p, null).build(), METRIC_FIELD);
     }
 
     private final List<SortBuilder<?>> sortBuilders;
-    // TODO MultiValuesSourceFieldConfig has more things than we support and less things than we want to support
     private final int size;
-    private final MultiValuesSourceFieldConfig metricField;
+    private final List<MultiValuesSourceFieldConfig> metricFields;
+    // TODO replace with ValuesSourceConfig once the value source refactor has landed
 
     /**
      * Build a {@code top_metrics} aggregation request.
      */
     public TopMetricsAggregationBuilder(String name, List<SortBuilder<?>> sortBuilders, int size,
-            MultiValuesSourceFieldConfig metricField) {
+            List<MultiValuesSourceFieldConfig> metricFields) {
         super(name);
         if (sortBuilders.size() != 1) {
             throw new IllegalArgumentException("[sort] must contain exactly one sort");
         }
         this.sortBuilders = sortBuilders;
         this.size = size;
-        this.metricField = metricField;
+        this.metricFields = metricFields;
     }
 
     /**
@@ -85,7 +86,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
         super(clone, factoriesBuilder, metaData);
         this.sortBuilders = clone.sortBuilders;
         this.size = clone.size;
-        this.metricField = clone.metricField;
+        this.metricFields = clone.metricFields;
     }
 
     /**
@@ -97,14 +98,14 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
         List<SortBuilder<?>> sortBuilders = (List<SortBuilder<?>>) (List<?>) in.readNamedWriteableList(SortBuilder.class);
         this.sortBuilders = sortBuilders;
         this.size = in.readVInt();
-        this.metricField = new MultiValuesSourceFieldConfig(in);
+        this.metricFields = in.readList(MultiValuesSourceFieldConfig::new);
     }
 
     @Override
     protected void doWriteTo(StreamOutput out) throws IOException {
         out.writeNamedWriteableList(sortBuilders);
         out.writeVInt(size);
-        metricField.writeTo(out);
+        out.writeList(metricFields);
     }
 
     @Override
@@ -116,7 +117,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
     protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, Builder subFactoriesBuilder)
             throws IOException {
         return new TopMetricsAggregatorFactory(name, queryShardContext, parent, subFactoriesBuilder, metaData, sortBuilders,
-                size, metricField);
+                size, metricFields);
     }
 
     @Override
@@ -129,7 +130,11 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
             }
             builder.endArray();
             builder.field(SIZE_FIELD.getPreferredName(), size);
-            builder.field(METRIC_FIELD.getPreferredName(), metricField);
+            builder.startArray(METRIC_FIELD.getPreferredName());
+            for (MultiValuesSourceFieldConfig metricField: metricFields) {
+                metricField.toXContent(builder, params);
+            }
+            builder.endArray();
         }
         builder.endObject();
         return builder;
@@ -148,7 +153,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
         return size;
     }
 
-    MultiValuesSourceFieldConfig getMetricField() {
-        return metricField;
+    List<MultiValuesSourceFieldConfig> getMetricFields() {
+        return metricFields;
     }
 }

+ 127 - 34
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.index.fielddata.NumericDoubleValues;
+import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.MultiValueMode;
 import org.elasticsearch.search.aggregations.Aggregator;
@@ -46,25 +47,23 @@ import java.util.Map;
  */
 class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
     private final int size;
-    private final String metricName;
     private final BucketedSort sort;
-    private final Values values;
-    private final ValuesSource.Numeric metricValueSource;
+    private final Metrics metrics;
 
     TopMetricsAggregator(String name, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
-            Map<String, Object> metaData, int size, String metricName,
-            SortBuilder<?> sort, ValuesSource.Numeric metricValueSource) throws IOException {
+            Map<String, Object> metaData, int size,
+            SortBuilder<?> sort, List<String> metricNames, List<ValuesSource.Numeric> metricValuesSources) throws IOException {
         super(name, context, parent, pipelineAggregators, metaData);
         this.size = size;
-        this.metricName = metricName;
-        this.metricValueSource = metricValueSource;
-        if (metricValueSource != null) {
-            values = new Values(size, context.bigArrays(), metricValueSource);
-            this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
-        } else {
-            values = null;
-            this.sort = null;
-        }
+        assert metricNames.size() == metricValuesSources.size();
+        metrics = new Metrics(size, context.getQueryShardContext(), metricNames, metricValuesSources);
+        /*
+         * If we're only collecting a single value then only provided *that*
+         * value to the sort so that swaps and loads are just a little faster
+         * in that *very* common case.
+         */
+        BucketedSort.ExtraData values = metricValuesSources.size() == 1 ? metrics.values[0] : metrics;
+        this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
     }
 
     @Override
@@ -72,7 +71,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         if (size != 1) {
             throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]");
         }
-        return metricName.equals(name);
+        return metrics.names.contains(name);
     }
 
     @Override
@@ -84,12 +83,12 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
          * be called after we've collected a bucket, so it won't just fetch
          * garbage.
          */
-        return values.values.get(owningBucketOrd);
+        return metrics.metric(name, owningBucketOrd);
     }
 
     @Override
     public ScoreMode scoreMode() {
-        boolean needs = (sort != null && sort.needsScores()) || (metricValueSource != null && metricValueSource.needsScores());
+        boolean needs = sort.needsScores() || metrics.needsScores();
         return needs ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
     }
 
@@ -97,9 +96,6 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
     public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
         assert sub == LeafBucketCollector.NO_OP_COLLECTOR : "Expected noop but was " + sub.toString();
 
-        if (metricValueSource == null) {
-            return LeafBucketCollector.NO_OP_COLLECTOR;
-        }
         BucketedSort.Leaf leafSort = sort.forLeaf(ctx);
 
         return new LeafBucketCollector() {
@@ -117,41 +113,115 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
 
     @Override
     public InternalAggregation buildAggregation(long bucket) throws IOException {
-        if (metricValueSource == null) {
-            return buildEmptyAggregation();
-        }
-        List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, values.resultBuilder(sort.getFormat()));
+        List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, metrics.resultBuilder(sort.getFormat()));
         assert topMetrics.size() <= size;
-        return new InternalTopMetrics(name, sort.getOrder(), metricName, size, topMetrics, pipelineAggregators(), metaData());
+        return new InternalTopMetrics(name, sort.getOrder(), metrics.names, size, topMetrics, pipelineAggregators(), metaData());
     }
 
     @Override
     public InternalTopMetrics buildEmptyAggregation() {
-        // The sort format and sort order aren't used in reduction so we pass the simplest thing.
-        return InternalTopMetrics.buildEmptyAggregation(name, metricName, pipelineAggregators(),
-                metaData());
+        return InternalTopMetrics.buildEmptyAggregation(name, metrics.names, pipelineAggregators(), metaData());
     }
 
     @Override
     public void doClose() {
-        Releasables.close(sort, values);
+        Releasables.close(sort, metrics);
+    }
+
+    private static class Metrics implements BucketedSort.ExtraData, Releasable {
+        private final List<String> names;
+        private final MetricValues[] values;
+
+        Metrics(int size, QueryShardContext ctx, List<String> names, List<ValuesSource.Numeric> valuesSources) {
+            this.names = names; 
+            values = new MetricValues[valuesSources.size()];
+            int i = 0;
+            for (ValuesSource.Numeric valuesSource : valuesSources) {
+                if (valuesSource == null) {
+                    values[i++] = new MissingMetricValues();
+                    continue;
+                }
+                values[i++] = new CollectMetricValues(size, ctx.bigArrays(), valuesSource);
+            }
+        }
+
+        boolean needsScores() {
+            for (int i = 0; i < values.length; i++) {
+                if (values[i].needsScores()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        double metric(String name, long index) {
+            int valueIndex = names.indexOf(name);
+            if (valueIndex < 0) {
+                throw new IllegalArgumentException("[" + name + "] not found");
+            }
+            return values[valueIndex].value(index);
+        }
+
+        BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
+            return (index, sortValue) -> {
+                double[] result = new double[values.length];
+                for (int i = 0; i < values.length; i++) {
+                    result[i] = values[i].value(index);
+                }
+                return new InternalTopMetrics.TopMetric(sortFormat, sortValue, result);
+            };
+        }
+
+        @Override
+        public void swap(long lhs, long rhs) {
+            for (int i = 0; i < values.length; i++) {
+                values[i].swap(lhs, rhs);
+            }
+        }
+
+        @Override
+        public Loader loader(LeafReaderContext ctx) throws IOException {
+            Loader[] loaders = new Loader[values.length];
+            for (int i = 0; i < values.length; i++) {
+                loaders[i] = values[i].loader(ctx);
+            }
+            return (index, doc) -> {
+                for (int i = 0; i < loaders.length; i++) {
+                    loaders[i].loadFromDoc(index, doc);
+                }
+            };
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(values);
+        }
     }
 
-    private static class Values implements BucketedSort.ExtraData, Releasable {
+    private interface MetricValues extends BucketedSort.ExtraData, Releasable {
+        boolean needsScores();
+        double value(long index);
+    }
+    private static class CollectMetricValues implements MetricValues {
         private final BigArrays bigArrays;
         private final ValuesSource.Numeric metricValueSource;
 
         private DoubleArray values;
 
-        Values(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
+        CollectMetricValues(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
             this.bigArrays = bigArrays;
             this.metricValueSource = metricValueSource;
             values = bigArrays.newDoubleArray(size, false);
         }
 
-        BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
-            return (index, sortValue) ->
-                new InternalTopMetrics.TopMetric(sortFormat, sortValue, values.get(index));
+        @Override
+        public boolean needsScores() {
+            return metricValueSource.needsScores();
+        }
+
+        @Override
+        public double value(long index) {
+            return values.get(index);
         }
 
         @Override
@@ -179,4 +249,27 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
             values.close();
         }
     }
+    private static class MissingMetricValues implements MetricValues {
+        @Override
+        public double value(long index) {
+            return Double.NaN;
+        }
+
+        @Override
+        public boolean needsScores() {
+            return false;
+        }
+
+        @Override
+        public void swap(long lhs, long rhs) {}
+
+        @Override
+        public Loader loader(LeafReaderContext ctx) throws IOException {
+            return (index, doc) -> {};
+        }
+
+        @Override
+        public void close() {
+        }
+    }
 }

+ 14 - 18
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java

@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.stream.Collectors.toList;
+
 public class TopMetricsAggregatorFactory extends AggregatorFactory {
     /**
      * Index setting describing the maximum number of top metrics that
@@ -35,40 +37,34 @@ public class TopMetricsAggregatorFactory extends AggregatorFactory {
 
     private final List<SortBuilder<?>> sortBuilders;
     private final int size;
-    private final MultiValuesSourceFieldConfig metricField;
+    private final List<MultiValuesSourceFieldConfig> metricFields;
 
     public TopMetricsAggregatorFactory(String name, QueryShardContext queryShardContext, AggregatorFactory parent,
             Builder subFactoriesBuilder, Map<String, Object> metaData, List<SortBuilder<?>> sortBuilders,
-            int size, MultiValuesSourceFieldConfig metricField) throws IOException {
+            int size, List<MultiValuesSourceFieldConfig> metricFields) throws IOException {
         super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
         this.sortBuilders = sortBuilders;
         this.size = size;
-        this.metricField = metricField;
+        this.metricFields = metricFields;
     }
 
     @Override
     protected TopMetricsAggregator createInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket,
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
-        ValuesSourceConfig<ValuesSource.Numeric> metricFieldSource = ValuesSourceConfig.resolve(queryShardContext, ValueType.NUMERIC,
-                metricField.getFieldName(), metricField.getScript(), metricField.getMissing(), metricField.getTimeZone(), null);
-        ValuesSource.Numeric metricValueSource = metricFieldSource.toValuesSource(queryShardContext);
         int maxBucketSize = MAX_BUCKET_SIZE.get(searchContext.getQueryShardContext().getIndexSettings().getSettings());
         if (size > maxBucketSize) {
             throw new IllegalArgumentException("[top_metrics.size] must not be more than [" + maxBucketSize + "] but was [" + size
                     + "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey()
                     + "] index level setting.");
         }
-        if (metricValueSource == null) {
-            return createUnmapped(searchContext, parent, pipelineAggregators, metaData);
-        }
-        return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
-                sortBuilders.get(0), metricValueSource);
-    }
-
-    private TopMetricsAggregator createUnmapped(SearchContext searchContext, Aggregator parent,
-            List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
-        return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
-                null, null);
+        List<String> metricNames = metricFields.stream().map(MultiValuesSourceFieldConfig::getFieldName).collect(toList());
+        List<ValuesSource.Numeric> metricValuesSources = metricFields.stream().map(config -> {
+                    ValuesSourceConfig<ValuesSource.Numeric> resolved = ValuesSourceConfig.resolve(
+                            searchContext.getQueryShardContext(), ValueType.NUMERIC,
+                            config.getFieldName(), config.getScript(), config.getMissing(), config.getTimeZone(), null);
+                    return resolved.toValuesSource(searchContext.getQueryShardContext());
+                }).collect(toList());
+        return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size,
+                sortBuilders.get(0), metricNames, metricValuesSources);
     }
-
 }

+ 7 - 6
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetricsReduceTests.java

@@ -15,6 +15,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.sameInstance;
 
@@ -46,7 +47,7 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
         InternalTopMetrics winner = first.getSortOrder() == SortOrder.ASC ? min : max;
         InternalTopMetrics reduced = reduce(metrics);
         assertThat(reduced.getName(), equalTo("test"));
-        assertThat(reduced.getMetricName(), equalTo("test"));
+        assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
         assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
         assertThat(reduced.getSize(), equalTo(first.getSize()));
         assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
@@ -61,7 +62,7 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
         };
         InternalTopMetrics reduced = reduce(metrics);
         assertThat(reduced.getName(), equalTo("test"));
-        assertThat(reduced.getMetricName(), equalTo("test"));
+        assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
         assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
         assertThat(reduced.getSize(), equalTo(first.getSize()));
         assertThat(reduced.getTopMetrics(), equalTo(List.of(
@@ -75,14 +76,14 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
         // Doubles sort first.
         InternalTopMetrics winner = doubleMetrics.getSortOrder() == SortOrder.ASC ? doubleMetrics : longMetrics; 
         assertThat(reduced.getName(), equalTo("test"));
-        assertThat(reduced.getMetricName(), equalTo("test"));
+        assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
         assertThat(reduced.getSortOrder(), equalTo(doubleMetrics.getSortOrder()));
         assertThat(reduced.getSize(), equalTo(doubleMetrics.getSize()));
         assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
     }
 
     private InternalTopMetrics buildEmpty() {
-        return InternalTopMetrics.buildEmptyAggregation("test", "test", emptyList(), null);
+        return InternalTopMetrics.buildEmptyAggregation("test", singletonList("test"), emptyList(), null);
     }
 
     private InternalTopMetrics buildFilled(int size, InternalTopMetrics.TopMetric... metrics) {
@@ -90,12 +91,12 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
     }
 
     private InternalTopMetrics buildFilled(SortOrder sortOrder, int size, InternalTopMetrics.TopMetric... metrics) {
-        return new InternalTopMetrics("test", sortOrder, "test", size, Arrays.asList(metrics), emptyList(), null);
+        return new InternalTopMetrics("test", sortOrder, singletonList("test"), size, Arrays.asList(metrics), emptyList(), null);
     }
 
     private InternalTopMetrics.TopMetric top(SortValue sortValue, double metricValue) {
         DocValueFormat sortFormat = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN, DocValueFormat.IP);
-        return new InternalTopMetrics.TopMetric(sortFormat, sortValue, metricValue);
+        return new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {metricValue});
     }
 
     private InternalTopMetrics reduce(InternalTopMetrics... results) {

+ 67 - 21
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/InternalTopMetricsTests.java

@@ -26,16 +26,18 @@ import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toList;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
 
 public class InternalTopMetricsTests extends InternalAggregationTestCase<InternalTopMetrics> {
@@ -50,7 +52,7 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
 
     public void testEmptyIsNotMapped() {
         InternalTopMetrics empty = InternalTopMetrics.buildEmptyAggregation(
-                randomAlphaOfLength(5), randomAlphaOfLength(2), emptyList(), null);
+                randomAlphaOfLength(5), randomMetricNames(between(1, 5)), emptyList(), null);
         assertFalse(empty.isMapped());
     }
 
@@ -60,8 +62,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
     }
 
     public void testToXContentDoubleSortValue() throws IOException {
-        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
-                List.of(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0)), emptyList(), null);
+        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
+                List.of(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0})), emptyList(), null);
         assertThat(Strings.toString(tm, true, true), equalTo(
                 "{\n" +
                 "  \"test\" : {\n" +
@@ -83,8 +85,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
         DocValueFormat sortFormat = new DocValueFormat.DateTime(DateFormatter.forPattern("strict_date_time"), ZoneId.of("UTC"),
                 DateFieldMapper.Resolution.MILLISECONDS);
         SortValue sortValue = SortValue.from(ZonedDateTime.parse("2007-12-03T10:15:30Z").toInstant().toEpochMilli());
-        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
-                List.of(new InternalTopMetrics.TopMetric(sortFormat, sortValue, 1.0)), emptyList(), null);
+        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
+                List.of(new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {1.0})), emptyList(), null);
         assertThat(Strings.toString(tm, true, true), equalTo(
                 "{\n" +
                 "  \"test\" : {\n" +
@@ -102,11 +104,34 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
                 "}"));
     }
 
-    public void testToXContentManyValues() throws IOException {
-        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 2,
+    public void testToXContentManyMetrics() throws IOException {
+        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, List.of("foo", "bar", "baz"), 1,
+                List.of(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0, 2.0, 3.0})),
+                emptyList(), null);
+        assertThat(Strings.toString(tm, true, true), equalTo(
+                "{\n" +
+                "  \"test\" : {\n" +
+                "    \"top\" : [\n" +
+                "      {\n" +
+                "        \"sort\" : [\n" +
+                "          1.0\n" +
+                "        ],\n" +
+                "        \"metrics\" : {\n" +
+                "          \"foo\" : 1.0,\n" +
+                "          \"bar\" : 2.0,\n" +
+                "          \"baz\" : 3.0\n" +
+                "        }\n" +
+                "      }\n" +
+                "    ]\n" +
+                "  }\n" +
+                "}"));
+    }
+
+    public void testToXContentManyTopMetrics() throws IOException {
+        InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 2,
                 List.of(
-                    new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0),
-                    new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), 2.0)),
+                    new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0}),
+                    new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), new double[] {2.0})),
                 emptyList(), null);
         assertThat(Strings.toString(tm, true, true), equalTo(
                 "{\n" +
@@ -144,17 +169,18 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
     @Override
     protected InternalTopMetrics createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
             Map<String, Object> metaData) {
-        String metricName = randomAlphaOfLength(5);
+        int metricCount = between(1, 5);
+        List<String> metricNames = randomMetricNames(metricCount);
         int size = between(1, 100);
-        List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size));
-        return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics, pipelineAggregators, metaData);
+        List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size), metricCount);
+        return new InternalTopMetrics(name, sortOrder, metricNames, size, topMetrics, pipelineAggregators, metaData);
     }
 
     @Override
     protected InternalTopMetrics mutateInstance(InternalTopMetrics instance) throws IOException {
         String name = instance.getName();
         SortOrder sortOrder = instance.getSortOrder();
-        String metricName = instance.getMetricName();
+        List<String> metricNames = instance.getMetricNames();
         int size = instance.getSize();
         List<InternalTopMetrics.TopMetric> topMetrics = instance.getTopMetrics();
         switch (randomInt(4)) {
@@ -166,19 +192,21 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
             Collections.reverse(topMetrics);
             break;
         case 2:
-            metricName = randomAlphaOfLength(6);
+            metricNames = new ArrayList<>(metricNames);
+            metricNames.set(randomInt(metricNames.size() - 1), randomAlphaOfLength(6));
             break;
         case 3:
             size = randomValueOtherThan(size, () -> between(1, 100));
             break;
         case 4:
             int fixedSize = size;
-            topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize)));
+            int fixedMetricsSize = metricNames.size();
+            topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize), fixedMetricsSize));
             break;
         default:
             throw new IllegalArgumentException("bad mutation");
         }
-        return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics,
+        return new InternalTopMetrics(name, sortOrder, metricNames, size, topMetrics,
                 instance.pipelineAggregators(), instance.getMetaData());
     }
 
@@ -198,7 +226,11 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
             Object expectedSort = internalTop.getSortFormat() == DocValueFormat.RAW ?
                     internalTop.getSortValue().getKey() : internalTop.getSortValue().format(internalTop.getSortFormat());
             assertThat(parsedTop.getSort(), equalTo(singletonList(expectedSort)));
-            assertThat(parsedTop.getMetrics(), equalTo(singletonMap(aggregation.getMetricName(), internalTop.getMetricValue())));
+            assertThat(parsedTop.getMetrics().keySet(), hasSize(aggregation.getMetricNames().size()));
+            for (int m = 0; m < aggregation.getMetricNames().size(); m++) {
+                assertThat(parsedTop.getMetrics(),
+                        hasEntry(aggregation.getMetricNames().get(m), internalTop.getMetricValues()[m]));
+            }
         }
     }
 
@@ -213,17 +245,31 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
         List<InternalTopMetrics.TopMetric> winners = metrics.size() > first.getSize() ? metrics.subList(0, first.getSize()) : metrics;
         assertThat(reduced.getName(), equalTo(first.getName()));
         assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
-        assertThat(reduced.getMetricName(), equalTo(first.getMetricName()));
+        assertThat(reduced.getMetricNames(), equalTo(first.getMetricNames()));
         assertThat(reduced.getTopMetrics(), equalTo(winners));
     }
 
-    private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length) {
+    private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length, int metricCount) {
         return IntStream.range(0, length)
-                .mapToObj(i -> new InternalTopMetrics.TopMetric(randomNumericDocValueFormat(), randomSortValue(), randomDouble()))
+                .mapToObj(i -> new InternalTopMetrics.TopMetric(
+                        randomNumericDocValueFormat(), randomSortValue(), randomMetricValues(metricCount)
+                ))
                 .sorted((lhs, rhs) -> sortOrder.reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()))
                 .collect(toList());
     }
 
+    static List<String> randomMetricNames(int metricCount) {
+        Set<String> names = new HashSet<>(metricCount);
+        while (names.size() < metricCount) {
+            names.add(randomAlphaOfLength(5));
+        }
+        return new ArrayList<>(names);
+    }
+
+    private double[] randomMetricValues(int metricCount) {
+        return IntStream.range(0, metricCount).mapToDouble(i -> randomDouble()).toArray();
+    }
+
     private static SortValue randomSortValue() {
         if (randomBoolean()) {
             return SortValue.from(randomLong());

+ 10 - 4
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilderTests.java

@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -68,9 +69,14 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
     protected TopMetricsAggregationBuilder createTestInstance() {
         List<SortBuilder<?>> sortBuilders = singletonList(
                 new FieldSortBuilder(randomAlphaOfLength(5)).order(randomFrom(SortOrder.values())));
-        MultiValuesSourceFieldConfig.Builder metricField = new MultiValuesSourceFieldConfig.Builder();
-        metricField.setFieldName(randomAlphaOfLength(5)).setMissing(1.0);
-        return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, between(1, 100), metricField.build());
+        List<MultiValuesSourceFieldConfig> metricFields = InternalTopMetricsTests.randomMetricNames(between(1, 5)).stream()
+                .map(name -> {
+                    MultiValuesSourceFieldConfig.Builder metricField = new MultiValuesSourceFieldConfig.Builder();
+                    metricField.setFieldName(randomAlphaOfLength(5)).setMissing(1.0);
+                    return metricField.build();
+                })
+                .collect(toList());
+        return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, between(1, 100), metricFields);
     }
 
     public void testClientBuilder() throws IOException {
@@ -98,6 +104,6 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
                         serverBuilder.getName(),
                         serverBuilder.getSortBuilders().get(0),
                         serverBuilder.getSize(),
-                        serverBuilder.getMetricField().getFieldName());
+                        serverBuilder.getMetricFields().stream().map(MultiValuesSourceFieldConfig::getFieldName).toArray(String[]::new));
     }
 }

+ 58 - 14
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java

@@ -75,6 +75,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.notANumber;
@@ -96,7 +97,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
                 },
                 numberFieldType(NumberType.DOUBLE, "s"));
         assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
-        assertThat(result.getTopMetrics(), equalTo(emptyList()));
+        assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, Double.NaN))));
     }
 
     public void testMissingValueForMetric() throws IOException {
@@ -107,7 +108,8 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
         assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
         assertThat(result.getTopMetrics(), hasSize(1));
         assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1.0)));
-        assertThat(result.getTopMetrics().get(0).getMetricValue(), notANumber());
+        assertThat(result.getTopMetrics().get(0).getMetricValues().length, equalTo(1));
+        assertThat(result.getTopMetrics().get(0).getMetricValues()[0], notANumber());
     }
 
     public void testActualValueForMetric() throws IOException {
@@ -131,7 +133,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
         InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)));
         assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
         assertThat(result.getTopMetrics(), equalTo(singletonList(
-                new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 2.0))));
+                new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {2.0}))));
     }
 
     public void testSortByDoubleDescending() throws IOException {
@@ -350,21 +352,50 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
                  * breaker. The number of buckets feels fairly arbitrary but
                  * it comes from:
                  * budget = 15k = 20k - 5k for the "default weight" of ever agg
-                 * The 922th bucket causes a resize which requests puts the total
-                 * just over 15k.O
+                 * The 646th bucket causes a resize which requests puts the total
+                 * just over 15k. This works out to more like 190 bits per bucket
+                 * when we're fairly sure this should take about 129 bits per
+                 * bucket. The difference is because, for arrays in of this size,
+                 * BigArrays allocates the new array before freeing the old one.
+                 * That causes us to trip when we're about 2/3 of the way to the
+                 * limit. And 2/3 of 190 is 126. Which is pretty much what we
+                 * expect. Sort of. 
                  */
-                int bucketThatBreaks = 922; 
+                int bucketThatBreaks = 646;
                 for (int b = 0; b < bucketThatBreaks; b++) {
-                    leaf.collect(0, b);
+                    try {
+                        leaf.collect(0, b);
+                    } catch (CircuitBreakingException e) {
+                        throw new AssertionError("Unexpected circuit break at [" + b + "]. Expected at [" + bucketThatBreaks + "]", e);
+                    }
                 }
                 CircuitBreakingException e = expectThrows(CircuitBreakingException.class, () -> leaf.collect(0, bucketThatBreaks));
                 assertThat(e.getMessage(), equalTo("test error"));
                 assertThat(e.getByteLimit(), equalTo(max.getBytes()));
-                assertThat(e.getBytesWanted(), equalTo(16440L));
+                assertThat(e.getBytesWanted(), equalTo(5872L));
             }
         }
     }
 
+    public void testManyMetrics() throws IOException {
+        List<SortBuilder<?>> sorts = singletonList(new FieldSortBuilder("s").order(SortOrder.ASC));
+        TopMetricsAggregationBuilder builder = new TopMetricsAggregationBuilder("test", sorts, 1,
+                List.of(
+                        new MultiValuesSourceFieldConfig.Builder().setFieldName("m1").build(),
+                        new MultiValuesSourceFieldConfig.Builder().setFieldName("m2").build(),
+                        new MultiValuesSourceFieldConfig.Builder().setFieldName("m3").build()
+                ));
+        InternalTopMetrics result = collect(builder, new MatchAllDocsQuery(), writer -> {
+                    writer.addDocument(Arrays.asList(doubleField("s", 1.0),
+                            doubleField("m1", 12.0), doubleField("m2", 22.0), doubleField("m3", 32.0)));
+                    writer.addDocument(Arrays.asList(doubleField("s", 2.0),
+                            doubleField("m1", 13.0), doubleField("m2", 23.0), doubleField("m3", 33.0)));
+                }, manyMetricsFields());
+        assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
+        assertThat(result.getTopMetrics(), equalTo(singletonList(
+                new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {12.0, 22.0, 32.0}))));
+    }
+
     private TopMetricsAggregationBuilder simpleBuilder() {
         return simpleBuilder(new FieldSortBuilder("s"));
     }
@@ -375,7 +406,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
 
     private TopMetricsAggregationBuilder simpleBuilder(SortBuilder<?> sort, int size) {
         return new TopMetricsAggregationBuilder("test", singletonList(sort), size,
-                new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build());
+                singletonList(new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build()));
     }
 
     /**
@@ -395,6 +426,16 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
         return new MappedFieldType[] {numberFieldType(NumberType.DOUBLE, "s"), numberFieldType(NumberType.DOUBLE, "m")};
     }
 
+    private MappedFieldType[] manyMetricsFields() {
+        return new MappedFieldType[] {
+                numberFieldType(NumberType.DOUBLE, "s"),
+                numberFieldType(NumberType.DOUBLE, "m1"),
+                numberFieldType(NumberType.DOUBLE, "m2"),
+                numberFieldType(NumberType.DOUBLE, "m3"),
+        };
+    }
+
+
     private MappedFieldType[] floatAndDoubleField() {
         return new MappedFieldType[] {numberFieldType(NumberType.FLOAT, "s"), numberFieldType(NumberType.DOUBLE, "m")};
     }
@@ -453,7 +494,10 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
     private InternalTopMetrics collect(TopMetricsAggregationBuilder builder, Query query,
             CheckedConsumer<RandomIndexWriter, IOException> buildIndex, MappedFieldType... fields) throws IOException {
         InternalTopMetrics result = (InternalTopMetrics) collect((AggregationBuilder) builder, query, buildIndex, fields);
-        assertThat(result.getMetricName(), equalTo(builder.getMetricField().getFieldName()));
+        List<String> expectedFieldNames = builder.getMetricFields().stream()
+                .map(MultiValuesSourceFieldConfig::getFieldName)
+                .collect(toList());
+        assertThat(result.getMetricNames(), equalTo(expectedFieldNames));
         return result;
     }
 
@@ -471,12 +515,12 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
         }
     }
 
-    private InternalTopMetrics.TopMetric top(long sortValue, double metricValue) {
-        return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
+    private InternalTopMetrics.TopMetric top(long sortValue, double... metricValues) {
+        return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
     }
 
-    private InternalTopMetrics.TopMetric top(double sortValue, double metricValue) {
-        return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
+    private InternalTopMetrics.TopMetric top(double sortValue, double... metricValues) {
+        return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
     }
 
     /**

+ 17 - 17
x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml

@@ -19,7 +19,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: desc
@@ -32,7 +32,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: asc
@@ -45,7 +45,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s:
@@ -84,7 +84,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: desc
@@ -97,7 +97,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: asc
@@ -134,7 +134,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: desc
@@ -147,7 +147,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: asc
@@ -180,7 +180,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort: s.keyword
   - match: { error.root_cause.0.reason: "error building sort for field [s.keyword] of type [keyword] in index [test]: only supported on numeric fields" }
@@ -216,7 +216,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort: _score
   - match: { aggregations.tm.top.0.metrics.v: 3.1414999961853027 }
@@ -239,7 +239,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   _script:
@@ -275,7 +275,7 @@
           aggs:
             tm:
               top_metrics:
-                metric:
+                metrics:
                   field: v
                 sort:
                   _script:
@@ -313,7 +313,7 @@
           aggs:
             pop:
               top_metrics:
-                metric:
+                metrics:
                   field: population
                 sort:
                   _geo_distance:
@@ -329,7 +329,7 @@
             pop:
               top_metrics:
                 size: 3
-                metric:
+                metrics:
                   field: population
                 sort:
                   _geo_distance:
@@ -376,7 +376,7 @@
               aggs:
                 tm:
                   top_metrics:
-                    metric:
+                    metrics:
                       field: v
                     sort:
                       date: desc
@@ -401,7 +401,7 @@
               aggs:
                 tm:
                   top_metrics:
-                    metric:
+                    metrics:
                       field: v
                     sort:
                       date: desc
@@ -443,7 +443,7 @@
             tm:
               top_metrics:
                 size: 100
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: desc
@@ -463,7 +463,7 @@
             tm:
               top_metrics:
                 size: 100
-                metric:
+                metrics:
                   field: v
                 sort:
                   s: desc