Browse Source

[Transform] avoid mapping problems with index templates (#51368)

insert explict mappings for objects in nested output to avoid clashes with index templates

fixes #51321
Hendrik Muhs 5 years ago
parent
commit
4d11e1ad24

+ 105 - 0
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java

@@ -0,0 +1,105 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.transform.integration;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
+    private static boolean indicesCreated = false;
+
+    // preserve indices in order to reuse source indices in several test cases
+    @Override
+    protected boolean preserveIndicesUponCompletion() {
+        return true;
+    }
+
+    @Before
+    public void createIndexes() throws IOException {
+
+        // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
+        if (indicesCreated) {
+            return;
+        }
+
+        createReviewsIndex();
+        indicesCreated = true;
+    }
+
+    public void testIndexTemplateMappingClash() throws Exception {
+        String transformId = "special_pivot_template_mappings_clash";
+        String transformIndex = "special_pivot_template_mappings_clash";
+
+        // create a template that defines a field "rating" with a type "float" which will clash later with
+        // output field "rating.avg" in the pivot config
+        final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template");
+
+        String template = "{"
+            + "\"index_patterns\" : [\"special_pivot_template*\"],"
+            + "  \"mappings\" : {"
+            + "    \"properties\": {"
+            + "      \"rating\":{"
+            + "        \"type\": \"float\"\n"
+            + "      }"
+            + "    }"
+            + "  }"
+            + "}";
+
+        createIndexTemplateRequest.setJsonEntity(template);
+        Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
+        assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
+
+        String config = "{"
+            + " \"source\": {\"index\":\""
+            + REVIEWS_INDEX_NAME
+            + "\"},"
+            + " \"dest\": {\"index\":\""
+            + transformIndex
+            + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"rating.avg\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } }"
+            + " } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex);
+        assertTrue(indexExists(transformIndex));
+
+        // we expect 27 documents as there shall be 27 user_id's
+        Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
+        assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // get and check some users
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
+
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
+        assertEquals(3.878048780, actual.doubleValue(), 0.000001);
+    }
+}

+ 3 - 0
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -427,6 +427,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
 
         Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
+
+        // assert that the transform did not fail
+        assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
         return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
     }
 

+ 94 - 47
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

@@ -42,8 +42,7 @@ public final class SchemaUtil {
         NUMERIC_FIELD_MAPPER_TYPES = types;
     }
 
-    private SchemaUtil() {
-    }
+    private SchemaUtil() {}
 
     public static boolean isNumericType(String type) {
         return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
@@ -59,10 +58,12 @@ public final class SchemaUtil {
      * @param source Source index that contains the data to pivot
      * @param listener Listener to alert on success or failure.
      */
-    public static void deduceMappings(final Client client,
-                                      final PivotConfig config,
-                                      final String[] source,
-                                      final ActionListener<Map<String, String>> listener) {
+    public static void deduceMappings(
+        final Client client,
+        final PivotConfig config,
+        final String[] source,
+        final ActionListener<Map<String, String>> listener
+    ) {
         // collects the fieldnames used as source for aggregations
         Map<String, String> aggregationSourceFieldNames = new HashMap<>();
         // collects the aggregation types by source name
@@ -70,16 +71,16 @@ public final class SchemaUtil {
         // collects the fieldnames and target fieldnames used for grouping
         Map<String, String> fieldNamesForGrouping = new HashMap<>();
 
-        config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> {
-            fieldNamesForGrouping.put(destinationFieldName, group.getField());
-        });
+        config.getGroupConfig()
+            .getGroups()
+            .forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
 
         for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
             if (agg instanceof ValuesSourceAggregationBuilder) {
                 ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
                 aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
                 aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
-            } else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
+            } else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
                 aggregationTypes.put(agg.getName(), agg.getType());
             } else {
                 // execution should not reach this point
@@ -98,13 +99,17 @@ public final class SchemaUtil {
         allFieldNames.putAll(aggregationSourceFieldNames);
         allFieldNames.putAll(fieldNamesForGrouping);
 
-        getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
+        getSourceFieldMappings(
+            client,
+            source,
+            allFieldNames.values().toArray(new String[0]),
             ActionListener.wrap(
-                sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
-                    aggregationTypes,
-                    fieldNamesForGrouping,
-                    sourceMappings)),
-                listener::onFailure));
+                sourceMappings -> listener.onResponse(
+                    resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
+                ),
+                listener::onFailure
+            )
+        );
     }
 
     /**
@@ -115,27 +120,29 @@ public final class SchemaUtil {
      * @param index The index, or index pattern, from which to gather all the field mappings
      * @param listener The listener to be alerted on success or failure.
      */
-    public static void getDestinationFieldMappings(final Client client,
-                                                   final String index,
-                                                   final ActionListener<Map<String, String>> listener) {
-        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
-            .indices(index)
+    public static void getDestinationFieldMappings(
+        final Client client,
+        final String index,
+        final ActionListener<Map<String, String>> listener
+    ) {
+        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
             .fields("*")
             .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
-        ClientHelper.executeAsyncWithOrigin(client,
+        ClientHelper.executeAsyncWithOrigin(
+            client,
             ClientHelper.TRANSFORM_ORIGIN,
             FieldCapabilitiesAction.INSTANCE,
             fieldCapabilitiesRequest,
-            ActionListener.wrap(
-                r -> listener.onResponse(extractFieldMappings(r)),
-                listener::onFailure
-            ));
+            ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure)
+        );
     }
 
-    private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
-                                                       Map<String, String> aggregationTypes,
-                                                       Map<String, String> fieldNamesForGrouping,
-                                                       Map<String, String> sourceMappings) {
+    private static Map<String, String> resolveMappings(
+        Map<String, String> aggregationSourceFieldNames,
+        Map<String, String> aggregationTypes,
+        Map<String, String> fieldNamesForGrouping,
+        Map<String, String> sourceMappings
+    ) {
         Map<String, String> targetMapping = new HashMap<>();
 
         aggregationTypes.forEach((targetFieldName, aggregationName) -> {
@@ -143,8 +150,7 @@ public final class SchemaUtil {
             String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
             String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
 
-            logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
-                targetFieldName, aggregationName, destinationMapping);
+            logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);
 
             if (Aggregations.isDynamicMapping(destinationMapping)) {
                 logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
@@ -165,34 +171,75 @@ public final class SchemaUtil {
                 targetMapping.put(targetFieldName, "keyword");
             }
         });
+
+        // insert object mappings for nested fields
+        insertNestedObjectMappings(targetMapping);
+
         return targetMapping;
     }
 
     /*
      * Very "magic" helper method to extract the source mappings
      */
-    private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
-                                               ActionListener<Map<String, String>> listener) {
-        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
-            .indices(index)
+    private static void getSourceFieldMappings(
+        Client client,
+        String[] index,
+        String[] fields,
+        ActionListener<Map<String, String>> listener
+    ) {
+        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
             .fields(fields)
             .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
-        client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
-            response -> listener.onResponse(extractFieldMappings(response)),
-            listener::onFailure));
+        client.execute(
+            FieldCapabilitiesAction.INSTANCE,
+            fieldCapabilitiesRequest,
+            ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)
+        );
     }
 
     private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
         Map<String, String> extractedTypes = new HashMap<>();
 
-        response.get().forEach((fieldName, capabilitiesMap) -> {
-            // TODO: overwrites types, requires resolve if
-            // types are mixed
-            capabilitiesMap.forEach((name, capability) -> {
-                logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
-                extractedTypes.put(fieldName, capability.getType());
-            });
-        });
+        response.get()
+            .forEach(
+                (fieldName, capabilitiesMap) -> {
+                    // TODO: overwrites types, requires resolve if
+                    // types are mixed
+                    capabilitiesMap.forEach((name, capability) -> {
+                        logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
+                        extractedTypes.put(fieldName, capability.getType());
+                    });
+                }
+            );
         return extractedTypes;
     }
+
+    /**
+     * Insert object mappings for fields like:
+     *
+     * a.b.c : some_type
+     *
+     * in which case it creates additional mappings:
+     *
+     * a.b : object
+     * a : object
+     *
+     * avoids snafu with index templates injecting incompatible mappings
+     *
+     * @param fieldMappings field mappings to inject to
+     */
+    static void insertNestedObjectMappings(Map<String, String> fieldMappings) {
+        Map<String, String> additionalMappings = new HashMap<>();
+        fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> {
+            int pos;
+            String objectKey = key;
+            // lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0
+            while ((pos = objectKey.lastIndexOf(".")) > 0) {
+                objectKey = objectKey.substring(0, pos);
+                additionalMappings.putIfAbsent(objectKey, "object");
+            }
+        });
+
+        additionalMappings.forEach(fieldMappings::putIfAbsent);
+    }
 }

+ 57 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtilTests.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.transform.transforms.pivot;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaUtilTests extends ESTestCase {
+
+    public void testInsertNestedObjectMappings() {
+        Map<String, String> fieldMappings = new HashMap<>() {
+            {
+                // creates: a.b, a
+                put("a.b.c", "long");
+                put("a.b.d", "double");
+                // creates: c.b, c
+                put("c.b.a", "double");
+                // creates: c.d
+                put("c.d.e", "object");
+                put("d", "long");
+                put("e.f.g", "long");
+                // cc: already there
+                put("e.f", "object");
+                // cc: already there but different type (should not be possible)
+                put("e", "long");
+                // cc: start with . (should not be possible)
+                put(".x", "long");
+                // cc: start and ends with . (should not be possible), creates: .y
+                put(".y.", "long");
+                // cc: ends with . (should not be possible), creates: .z
+                put(".z.", "long");
+            }
+        };
+
+        SchemaUtil.insertNestedObjectMappings(fieldMappings);
+
+        assertEquals(18, fieldMappings.size());
+        assertEquals("long", fieldMappings.get("a.b.c"));
+        assertEquals("object", fieldMappings.get("a.b"));
+        assertEquals("double", fieldMappings.get("a.b.d"));
+        assertEquals("object", fieldMappings.get("a"));
+        assertEquals("object", fieldMappings.get("c.d"));
+        assertEquals("object", fieldMappings.get("e.f"));
+        assertEquals("long", fieldMappings.get("e"));
+        assertEquals("object", fieldMappings.get(".y"));
+        assertEquals("object", fieldMappings.get(".z"));
+        assertFalse(fieldMappings.containsKey("."));
+        assertFalse(fieldMappings.containsKey(""));
+    }
+
+}