Browse Source

[ML] adds support for non-numeric mapped types (#40220)

* [ML] adds support for non-numeric mapped types and mapping overrides

* correcting hlrc compilation issues after merge

* removing mapping_override option

* clearing up unnecessary changes
Benjamin Trent 6 years ago
parent
commit
a3e10a768a

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

@@ -45,7 +45,7 @@ public class DataFrameMessages {
             "Failed to create composite aggregation from pivot function";
     public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
             "Data frame transform configuration [{0}] has invalid elements";
-
+    public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
     public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
             "Failed to parse query for data frame transform";
     public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =

+ 47 - 0
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

@@ -18,6 +18,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 public class DataFramePivotRestIT extends DataFrameRestTestCase {
@@ -267,6 +268,52 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         });
     }
 
+    public void testPivotWithMaxOnDateField() throws Exception {
+        String transformId = "simpleDateHistogramPivotWithMaxTime";
+        String dataFrameIndex = "pivot_reviews_via_date_histogram_with_max_time";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
+
+        final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId,
+            BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        String config = "{"
+            + " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
+            + " \"dest\": \"" + dataFrameIndex + "\",";
+
+        config +="    \"pivot\": { \n" +
+            "        \"group_by\": {\n" +
+            "            \"by_day\": {\"date_histogram\": {\n" +
+            "                \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" +
+            "            }}\n" +
+            "        },\n" +
+            "    \n" +
+            "    \"aggs\" :{\n" +
+            "        \"avg_rating\": {\n" +
+            "            \"avg\": {\"field\": \"stars\"}\n" +
+            "        },\n" +
+            "        \"timestamp\": {\n" +
+            "            \"max\": {\"field\": \"timestamp\"}\n" +
+            "        }\n" +
+            "    }}"
+            + "}";
+
+        createDataframeTransformRequest.setJsonEntity(config);
+        Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
+        assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+        assertTrue(indexExists(dataFrameIndex));
+
+        startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        // we expect 21 documents as there shall be 21 days worth of docs
+        Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
+        assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+        assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
+        Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15");
+        String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0);
+        // Do `containsString` as actual ending timestamp is indeterminate due to how data is generated
+        assertThat(actual, containsString("2017-01-15T20:"));
+    }
+
     private void assertOnePivotValue(String query, double expected) throws IOException {
         Map<String, Object> searchResult = getAsMap(query);
 

+ 25 - 16
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
 import java.util.List;
@@ -57,9 +58,11 @@ public class TransportPreviewDataFrameTransformAction extends
             return;
         }
 
-        Pivot pivot = new Pivot(request.getConfig().getSource(),
-            request.getConfig().getQueryConfig().getQuery(),
-            request.getConfig().getPivotConfig());
+        final DataFrameTransformConfig config = request.getConfig();
+
+        Pivot pivot = new Pivot(config.getSource(),
+            config.getQueryConfig().getQuery(),
+            config.getPivotConfig());
 
         getPreview(pivot, ActionListener.wrap(
             previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
@@ -68,18 +71,24 @@ public class TransportPreviewDataFrameTransformAction extends
     }
 
     private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
-        ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
-            ClientHelper.DATA_FRAME_ORIGIN,
-            client,
-            SearchAction.INSTANCE,
-            pivot.buildSearchRequest(null),
-            ActionListener.wrap(
-                r -> {
-                    final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
-                    DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
-                    listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList()));
-                },
-                listener::onFailure
-            ));
+        pivot.deduceMappings(client, ActionListener.wrap(
+            deducedMappings -> {
+                ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
+                    ClientHelper.DATA_FRAME_ORIGIN,
+                    client,
+                    SearchAction.INSTANCE,
+                    pivot.buildSearchRequest(null),
+                    ActionListener.wrap(
+                        r -> {
+                            final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
+                            DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
+                            listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList()));
+                        },
+                        listener::onFailure
+                    ));
+            },
+            listener::onFailure
+        ));
+
     }
 }

+ 3 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

@@ -44,6 +44,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
 
     protected abstract DataFrameTransformConfig getConfig();
 
+    protected abstract Map<String, String> getFieldMappings();
+
     @Override
     protected void onStartJob(long now) {
         QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery();
@@ -70,7 +72,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
         final DataFrameTransformConfig transformConfig = getConfig();
         String indexName = transformConfig.getDestination();
 
-        return pivot.extractResults(agg, getStats()).map(document -> {
+        return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> {
             XContentBuilder builder;
             try {
                 builder = jsonBuilder();

+ 28 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
 import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
 
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -230,6 +231,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         private final DataFrameTransformsConfigManager transformsConfigManager;
         private final DataFrameTransformsCheckpointService transformsCheckpointService;
         private final String transformId;
+        private Map<String, String> fieldMappings = null;
 
         private DataFrameTransformConfig transformConfig = null;
 
@@ -248,6 +250,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return transformConfig;
         }
 
+        @Override
+        protected Map<String, String> getFieldMappings() {
+            return fieldMappings;
+        }
+
         @Override
         protected String getJobId() {
             return transformId;
@@ -279,6 +286,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                         DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
             }
 
+            if (fieldMappings == null) {
+                CountDownLatch latch = new CountDownLatch(1);
+                SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>(
+                    ActionListener.wrap(
+                        destinationMappings -> fieldMappings = destinationMappings,
+                        e -> {
+                            throw new RuntimeException(
+                                DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
+                                    transformConfig.getDestination()),
+                                e);
+                        }), latch));
+                try {
+                    latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                   throw new RuntimeException(
+                                DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
+                                    transformConfig.getDestination()),
+                                e);
+                }
+            }
+
             return super.maybeTriggerAsyncJob(now);
         }
 

+ 19 - 9
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java

@@ -21,6 +21,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
+
 final class AggregationResultUtils {
     private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
 
@@ -30,30 +32,38 @@ final class AggregationResultUtils {
      * @param agg The aggregation result
      * @param groups The original groupings used for querying
      * @param aggregationBuilders the aggregation used for querying
-     * @param dataFrameIndexerTransformStats stats collector
+     * @param fieldTypeMap A Map containing "field-name": "type" entries to determine the appropriate type for the aggregation results.
+     * @param stats stats collector
      * @return a map containing the results of the aggregation in a consumable way
      */
     public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
-                                                                     GroupConfig groups,
-                                                                     Collection<AggregationBuilder> aggregationBuilders,
-                                                                     DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
+                                                                                 GroupConfig groups,
+                                                                                 Collection<AggregationBuilder> aggregationBuilders,
+                                                                                 Map<String, String> fieldTypeMap,
+                                                                                 DataFrameIndexerTransformStats stats) {
         return agg.getBuckets().stream().map(bucket -> {
-            dataFrameIndexerTransformStats.incrementNumDocuments(bucket.getDocCount());
+            stats.incrementNumDocuments(bucket.getDocCount());
 
             Map<String, Object> document = new HashMap<>();
-            groups.getGroups().keySet().forEach(destinationFieldName -> {
-                document.put(destinationFieldName, bucket.getKey().get(destinationFieldName));
-            });
+            groups.getGroups().keySet().forEach(destinationFieldName ->
+                document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)));
 
             for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
                 String aggName = aggregationBuilder.getName();
+                final String fieldType = fieldTypeMap.get(aggName);
 
                 // TODO: support other aggregation types
                 Aggregation aggResult = bucket.getAggregations().get(aggName);
 
                 if (aggResult instanceof NumericMetricsAggregation.SingleValue) {
                     NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult;
-                    document.put(aggName, aggResultSingleValue.value());
+                    // If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose
+                    // formatted outputs.
+                    if (isNumericType(fieldType)) {
+                        document.put(aggName, aggResultSingleValue.value());
+                    } else {
+                        document.put(aggName, aggResultSingleValue.getValueAsString());
+                    }
                 } else {
                     // Execution should never reach this point!
                     // Creating transforms with unsupported aggregations shall not be possible

+ 8 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java

@@ -77,12 +77,17 @@ public class Pivot {
     }
 
     public Stream<Map<String, Object>> extractResults(CompositeAggregation agg,
-            DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
+                                                      Map<String, String> fieldTypeMap,
+                                                      DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
 
         GroupConfig groups = config.getGroupConfig();
         Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
 
-        return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, dataFrameIndexerTransformStats);
+        return AggregationResultUtils.extractCompositeAggregationResults(agg,
+            groups,
+            aggregationBuilders,
+            fieldTypeMap,
+            dataFrameIndexerTransformStats);
     }
 
     private void runTestQuery(Client client, final ActionListener<Boolean> listener) {
@@ -99,7 +104,7 @@ public class Pivot {
             }
             listener.onResponse(true);
         }, e->{
-            listener.onFailure(new RuntimeException("Failed to test query",e));
+            listener.onFailure(new RuntimeException("Failed to test query", e));
         }));
     }
 

+ 70 - 17
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java

@@ -13,20 +13,51 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction
 import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-public class SchemaUtil {
+public final class SchemaUtil {
     private static final Logger logger = LogManager.getLogger(SchemaUtil.class);
 
+    // Full collection of numeric field type strings
+    private static final Set<String> NUMERIC_FIELD_MAPPER_TYPES;
+    static {
+        Set<String> types = Stream.of(NumberFieldMapper.NumberType.values())
+            .map(NumberFieldMapper.NumberType::typeName)
+            .collect(Collectors.toSet());
+        types.add("scaled_float"); // have to add manually since scaled_float is in a module
+        NUMERIC_FIELD_MAPPER_TYPES = types;
+    }
+
     private SchemaUtil() {
     }
 
-    public static void deduceMappings(final Client client, final PivotConfig config, final String source,
+    public static boolean isNumericType(String type) {
+        return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
+    }
+
+    /**
+     * Deduce the mappings for the destination index given the source index
+     *
+     * The Listener is alerted with a {@code Map<String, String>} that is a "field-name":"type" mapping
+     *
+     * @param client Client from which to make requests against the cluster
+     * @param config The PivotConfig for which to deduce destination mapping
+     * @param source Source index that contains the data to pivot
+     * @param listener Listener to alert on success or failure.
+     */
+    public static void deduceMappings(final Client client,
+                                      final PivotConfig config,
+                                      final String source,
                                       final ActionListener<Map<String, String>> listener) {
         // collects the fieldnames used as source for aggregations
         Map<String, String> aggregationSourceFieldNames = new HashMap<>();
@@ -56,18 +87,42 @@ public class SchemaUtil {
         allFieldNames.putAll(fieldNamesForGrouping);
 
         getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
-                ActionListener.wrap(sourceMappings -> {
-                    Map<String, String> targetMapping = resolveMappings(aggregationSourceFieldNames, aggregationTypes,
-                            fieldNamesForGrouping, sourceMappings);
-
-                    listener.onResponse(targetMapping);
-                }, e -> {
-                    listener.onFailure(e);
-                }));
+                ActionListener.wrap(
+                    sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
+                        aggregationTypes,
+                        fieldNamesForGrouping,
+                        sourceMappings)),
+                    listener::onFailure));
+    }
+
+    /**
+     * Gathers the field mappings for the "destination" index. Listener will receive an error, or a {@code Map<String, String>} of
+     * "field-name":"type".
+     *
+     * @param client Client used to execute the request
+     * @param index The index, or index pattern, from which to gather all the field mappings
+     * @param listener The listener to be alerted on success or failure.
+     */
+    public static void getDestinationFieldMappings(final Client client,
+                                                   final String index,
+                                                   final ActionListener<Map<String, String>> listener) {
+        GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
+        fieldMappingRequest.indices(index);
+        fieldMappingRequest.fields("*");
+        ClientHelper.executeAsyncWithOrigin(client,
+            ClientHelper.DATA_FRAME_ORIGIN,
+            GetFieldMappingsAction.INSTANCE,
+            fieldMappingRequest,
+            ActionListener.wrap(
+                r -> listener.onResponse(extractFieldMappings(r.mappings())),
+                listener::onFailure
+            ));
     }
 
     private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
-            Map<String, String> aggregationTypes, Map<String, String> fieldNamesForGrouping, Map<String, String> sourceMappings) {
+                                                       Map<String, String> aggregationTypes,
+                                                       Map<String, String> fieldNamesForGrouping,
+                                                       Map<String, String> sourceMappings) {
         Map<String, String> targetMapping = new HashMap<>();
 
         aggregationTypes.forEach((targetFieldName, aggregationName) -> {
@@ -107,14 +162,12 @@ public class SchemaUtil {
         fieldMappingRequest.indices(index);
         fieldMappingRequest.fields(fields);
 
-        client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(response -> {
-            listener.onResponse(extractSourceFieldMappings(response.mappings()));
-        }, e -> {
-            listener.onFailure(e);
-        }));
+        client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(
+            response -> listener.onResponse(extractFieldMappings(response.mappings())),
+            listener::onFailure));
     }
 
-    private static Map<String, String> extractSourceFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
+    private static Map<String, String> extractFieldMappings(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
         Map<String, String> extractedTypes = new HashMap<>();
 
         mappings.forEach((indexName, docTypeToMapping) -> {

+ 132 - 6
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java

@@ -140,8 +140,11 @@ public class AggregationResultUtilsTests extends ESTestCase {
                         aggName, 12.55
                         )
                 );
-
-        executeTest(groupBy, aggregationBuilders, input, expected, 20);
+        Map<String, String> fieldTypeMap = asStringMap(
+            targetField, "keyword",
+            aggName, "double"
+        );
+        executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20);
     }
 
     public void testExtractCompositeAggregationResultsMultiSources() throws IOException {
@@ -222,7 +225,12 @@ public class AggregationResultUtilsTests extends ESTestCase {
                         aggName, 12.55
                         )
                 );
-        executeTest(groupBy, aggregationBuilders, input, expected, 10);
+        Map<String, String> fieldTypeMap = asStringMap(
+            aggName, "double",
+            targetField, "keyword",
+            targetField2, "keyword"
+        );
+        executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
     }
 
     public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException {
@@ -287,11 +295,119 @@ public class AggregationResultUtilsTests extends ESTestCase {
                         aggName2, -2.44
                         )
                 );
-        executeTest(groupBy, aggregationBuilders, input, expected, 200);
+        Map<String, String> fieldTypeMap = asStringMap(
+            targetField, "keyword",
+            aggName, "double",
+            aggName2, "double"
+        );
+        executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 200);
+    }
+
+    public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException {
+        String targetField = randomAlphaOfLengthBetween(5, 10);
+        String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2";
+
+        GroupConfig groupBy = parseGroupConfig("{"
+            + "\"" + targetField + "\" : {"
+            + "  \"terms\" : {"
+            + "     \"field\" : \"doesn't_matter_for_this_test\""
+            + "  } },"
+            + "\"" + targetField2 + "\" : {"
+            + "  \"terms\" : {"
+            + "     \"field\" : \"doesn't_matter_for_this_test\""
+            + "  } }"
+            + "}");
+
+        String aggName = randomAlphaOfLengthBetween(5, 10);
+        String aggTypedName = "avg#" + aggName;
+
+        String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2";
+        String aggTypedName2 = "max#" + aggName2;
+
+        Collection<AggregationBuilder> aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2));
+
+        Map<String, Object> input = asMap(
+            "buckets",
+            asList(
+                asMap(
+                    KEY, asMap(
+                        targetField, "ID1",
+                        targetField2, "ID1_2"
+                    ),
+                    aggTypedName, asMap(
+                        "value", 42.33),
+                    aggTypedName2, asMap(
+                        "value", 9.9),
+                    DOC_COUNT, 1),
+                asMap(
+                    KEY, asMap(
+                        targetField, "ID1",
+                        targetField2, "ID2_2"
+                    ),
+                    aggTypedName, asMap(
+                        "value", 8.4),
+                    aggTypedName2, asMap(
+                        "value", 222.33),
+                    DOC_COUNT, 2),
+                asMap(
+                    KEY, asMap(
+                        targetField, "ID2",
+                        targetField2, "ID1_2"
+                    ),
+                    aggTypedName, asMap(
+                        "value", 28.99),
+                    aggTypedName2, asMap(
+                        "value", -2.44),
+                    DOC_COUNT, 3),
+                asMap(
+                    KEY, asMap(
+                        targetField, "ID3",
+                        targetField2, "ID2_2"
+                    ),
+                    aggTypedName, asMap(
+                        "value", 12.55),
+                    aggTypedName2, asMap(
+                        "value", -100.44),
+                    DOC_COUNT, 4)
+            ));
+
+        List<Map<String, Object>> expected = asList(
+            asMap(
+                targetField, "ID1",
+                targetField2, "ID1_2",
+                aggName, 42.33,
+                aggName2, "9.9"
+            ),
+            asMap(
+                targetField, "ID1",
+                targetField2, "ID2_2",
+                aggName, 8.4,
+                aggName2, "222.33"
+            ),
+            asMap(
+                targetField, "ID2",
+                targetField2, "ID1_2",
+                aggName, 28.99,
+                aggName2, "-2.44"
+            ),
+            asMap(
+                targetField, "ID3",
+                targetField2, "ID2_2",
+                aggName, 12.55,
+                aggName2, "-100.44"
+            )
+        );
+        Map<String, String> fieldTypeMap = asStringMap(
+            aggName, "double",
+            aggName2, "keyword", // If the second aggregation was some non-numeric mapped field
+            targetField, "keyword",
+            targetField2, "keyword"
+        );
+        executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
     }
 
     private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input,
-            List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
+            Map<String, String> fieldTypeMap, List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
         DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
         XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
         builder.map(input);
@@ -299,7 +415,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
         try (XContentParser parser = createParser(builder)) {
             CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature");
             List<Map<String, Object>> result = AggregationResultUtils
-                    .extractCompositeAggregationResults(agg, groups, aggregationBuilders, stats).collect(Collectors.toList());
+                    .extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats).collect(Collectors.toList());
 
             assertEquals(expected, result);
             assertEquals(expectedDocCounts, stats.getNumDocuments());
@@ -321,4 +437,14 @@ public class AggregationResultUtilsTests extends ESTestCase {
         }
         return map;
     }
+
+    static Map<String, String> asStringMap(String... strings) {
+        assert strings.length % 2 == 0;
+        final Map<String, String> map = new HashMap<>();
+        for (int i = 0; i < strings.length; i += 2) {
+            String field = strings[i];
+            map.put(field, strings[i + 1]);
+        }
+        return map;
+    }
 }

+ 15 - 3
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java

@@ -37,7 +37,9 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -97,7 +99,9 @@ public class PivotTests extends ESTestCase {
     public void testSearchFailure() throws Exception {
         // test a failure during the search operation, transform creation fails if
         // search has failures although they might just be temporary
-        Pivot pivot = new Pivot("existing_source_index_with_failing_shards", new MatchAllQueryBuilder(), getValidPivotConfig());
+        Pivot pivot = new Pivot("existing_source_index_with_failing_shards",
+            new MatchAllQueryBuilder(),
+            getValidPivotConfig());
 
         assertInvalidTransform(client, pivot);
     }
@@ -106,7 +110,9 @@ public class PivotTests extends ESTestCase {
         for (String agg : supportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig));
+            Pivot pivot = new Pivot("existing_source",
+                new MatchAllQueryBuilder(),
+                getValidPivotConfig(aggregationConfig));
 
             assertValidTransform(client, pivot);
         }
@@ -116,7 +122,9 @@ public class PivotTests extends ESTestCase {
         for (String agg : unsupportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig));
+            Pivot pivot = new Pivot("existing_source",
+                new MatchAllQueryBuilder(),
+                getValidPivotConfig(aggregationConfig));
 
             assertInvalidTransform(client, pivot);
         }
@@ -178,6 +186,10 @@ public class PivotTests extends ESTestCase {
                 + "    }\n" + "  }" + "}");
     }
 
+    private Map<String, String> getFieldMappings() {
+        return Collections.singletonMap("values", "double");
+    }
+
     private AggregationConfig parseAggregations(String json) throws IOException {
         final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
                 DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);

+ 23 - 1
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -105,7 +105,6 @@ setup:
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform" }
   - match: { transforms.1.id: "airline-transform-dos" }
-
   - do:
       data_frame.get_data_frame_transform:
         transform_id: "airline-transform,airline-transform-dos"
@@ -135,6 +134,7 @@ setup:
         size: 1
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-dos" }
+
 ---
 "Test transform with invalid page parameter":
   - do:
@@ -143,3 +143,25 @@ setup:
         transform_id: "_all"
         from: 0
         size: 10000
+
+---
+"Verify put transform creates destination index with appropriate mapping":
+  - do:
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform"
+        body: >
+          {
+            "source": "airline-data",
+            "dest": "airline-data-by-airline",
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}}
+            }
+          }
+  - match: { acknowledged: true }
+  - do:
+      indices.get_mapping:
+        index: airline-data-by-airline
+  - match: { airline-data-by-airline.mappings.properties.airline.type: keyword }
+  - match: { airline-data-by-airline.mappings.properties.avg_response.type: double }
+  - match: { airline-data-by-airline.mappings.properties.time.type: date }