1
0
Эх сурвалжийг харах

[ML] properly nesting objects in document source (#41901)

* [ML] properly nesting objects in document source

* Throw exception on agg extraction failure, cause it to fail df

* throwing error to stop df if unsupported agg is found
Benjamin Trent 6 жил өмнө
parent
commit
c1d31f6064

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
 import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
 import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -606,7 +607,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
 
         private boolean isIrrecoverableFailure(Exception e) {
-            return e instanceof IndexNotFoundException;
+            return e instanceof IndexNotFoundException || e instanceof AggregationResultUtils.AggregationExtractionException;
         }
 
         synchronized void handleFailure(Exception e) {

+ 53 - 6
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms.pivot;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@@ -29,7 +30,7 @@ import java.util.stream.Stream;
 
 import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
 
-final class AggregationResultUtils {
+public final class AggregationResultUtils {
     private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
 
     /**
@@ -77,17 +78,18 @@ final class AggregationResultUtils {
                     //    gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
                     if (isNumericType(fieldType) ||
                         (aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) {
-                        document.put(aggName, aggResultSingleValue.value());
+                        updateDocument(document, aggName, aggResultSingleValue.value());
                     } else {
-                        document.put(aggName, aggResultSingleValue.getValueAsString());
+                        updateDocument(document, aggName, aggResultSingleValue.getValueAsString());
                     }
                 } else if (aggResult instanceof ScriptedMetric) {
-                    document.put(aggName, ((ScriptedMetric) aggResult).aggregation());
+                    updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation());
                 } else {
                     // Execution should never reach this point!
                     // Creating transforms with unsupported aggregations shall not be possible
-                    logger.error("Dataframe Internal Error: unsupported aggregation ["+ aggResult.getName() +"], ignoring");
-                    assert false;
+                    throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]",
+                        aggResult.getType(),
+                        aggResult.getName());
                 }
             }
 
@@ -97,4 +99,49 @@ final class AggregationResultUtils {
         });
     }
 
+    @SuppressWarnings("unchecked")
+    static void updateDocument(Map<String, Object> document, String fieldName, Object value) {
+        String[] fieldTokens = fieldName.split("\\.");
+        if (fieldTokens.length == 1) {
+            document.put(fieldName, value);
+            return;
+        }
+        Map<String, Object> internalMap = document;
+        for (int i = 0; i < fieldTokens.length; i++) {
+            String token = fieldTokens[i];
+            if (i == fieldTokens.length - 1) {
+                if (internalMap.containsKey(token)) {
+                    if (internalMap.get(token) instanceof Map) {
+                        throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
+                            fieldName);
+                    } else {
+                        throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
+                            fieldName,
+                            internalMap.get(token),
+                            value);
+                    }
+                }
+                internalMap.put(token, value);
+            } else {
+                if (internalMap.containsKey(token)) {
+                    if (internalMap.get(token) instanceof Map) {
+                        internalMap = (Map<String, Object>)internalMap.get(token);
+                    } else {
+                        throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
+                            fieldName);
+                    }
+                } else {
+                    Map<String, Object> newMap = new HashMap<>();
+                    internalMap.put(token, newMap);
+                    internalMap = newMap;
+                }
+            }
+        }
+    }
+
+    public static class AggregationExtractionException extends ElasticsearchException {
+        AggregationExtractionException(String msg, Object... args) {
+            super(msg, args);
+        }
+    }
 }

+ 46 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java

@@ -66,6 +66,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 public class AggregationResultUtilsTests extends ESTestCase {
 
@@ -736,6 +737,51 @@ public class AggregationResultUtilsTests extends ESTestCase {
         assertEquals(documentIdsFirstRun, documentIdsSecondRun);
     }
 
+    @SuppressWarnings("unchecked")
+    public void testUpdateDocument() {
+        Map<String, Object> document = new HashMap<>();
+
+        AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
+        AggregationResultUtils.updateDocument(document, "foo.bar.baz2", 2000L);
+        AggregationResultUtils.updateDocument(document, "bar.field1", 1L);
+        AggregationResultUtils.updateDocument(document, "metric", 10L);
+
+        assertThat(document.get("metric"), equalTo(10L));
+
+        Map<String, Object> bar = (Map<String, Object>)document.get("bar");
+
+        assertThat(bar.get("field1"), equalTo(1L));
+
+        Map<String, Object> foo = (Map<String, Object>)document.get("foo");
+        Map<String, Object> foobar = (Map<String, Object>)foo.get("bar");
+
+        assertThat(foobar.get("baz"), equalTo(1000L));
+        assertThat(foobar.get("baz2"), equalTo(2000L));
+    }
+
+    public void testUpdateDocumentWithDuplicate() {
+        Map<String, Object> document = new HashMap<>();
+
+        AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
+        AggregationResultUtils.AggregationExtractionException exception =
+            expectThrows(AggregationResultUtils.AggregationExtractionException.class,
+                () -> AggregationResultUtils.updateDocument(document, "foo.bar.baz", 2000L));
+        assertThat(exception.getMessage(),
+            equalTo("duplicate key value pairs key [foo.bar.baz] old value [1000] duplicate value [2000]"));
+    }
+
+    public void testUpdateDocumentWithObjectAndNotObject() {
+        Map<String, Object> document = new HashMap<>();
+
+        AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
+        AggregationResultUtils.AggregationExtractionException exception =
+            expectThrows(AggregationResultUtils.AggregationExtractionException.class,
+                () -> AggregationResultUtils.updateDocument(document, "foo.bar", 2000L));
+        assertThat(exception.getMessage(),
+            equalTo("mixed object types of nested and non-nested fields [foo.bar]"));
+    }
+
+
     private void executeTest(GroupConfig groups,
                              Collection<AggregationBuilder> aggregationBuilders,
                              Collection<PipelineAggregationBuilder> pipelineAggregationBuilders,

+ 11 - 1
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml

@@ -81,18 +81,28 @@ setup:
               "group_by": {
                 "airline": {"terms": {"field": "airline"}},
                 "by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
-              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+              "aggs": {
+                "avg_response": {"avg": {"field": "responsetime"}},
+                "time.max": {"max": {"field": "time"}},
+                "time.min": {"min": {"field": "time"}}
+              }
             }
           }
   - match: { preview.0.airline: foo }
   - match: { preview.0.by-hour: "2017-02-49 00" }
   - match: { preview.0.avg_response: 1.0 }
+  - match: { preview.0.time.max: "2017-02-18T00:30:00.000Z" }
+  - match: { preview.0.time.min: "2017-02-18T00:00:00.000Z" }
   - match: { preview.1.airline: bar }
   - match: { preview.1.by-hour: "2017-02-49 01" }
   - match: { preview.1.avg_response: 42.0 }
+  - match: { preview.1.time.max: "2017-02-18T01:00:00.000Z" }
+  - match: { preview.1.time.min: "2017-02-18T01:00:00.000Z" }
   - match: { preview.2.airline: foo }
   - match: { preview.2.by-hour: "2017-02-49 01" }
   - match: { preview.2.avg_response: 42.0 }
+  - match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
+  - match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
 
 ---
 "Test preview transform with invalid config":